Skip to content

Commit

Permalink
Merge branch 'unstable' of github.com:OpenAtomFoundation/pika into fi…
Browse files Browse the repository at this point in the history
…x_lock
  • Loading branch information
wuxianrong committed Mar 4, 2024
2 parents 53649e7 + ca69573 commit 6ef765d
Show file tree
Hide file tree
Showing 26 changed files with 115 additions and 104 deletions.
3 changes: 1 addition & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,8 @@ disable_auto_compactions : false
sync-window-size : 9000

# Maximum buffer size of a client connection.
# Only three values are valid here: [67108864(64MB) | 268435456(256MB) | 536870912(512MB)].
# [NOTICE] Master and slaves must have exactly the same value for the max-conn-rbuf-size.
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB).
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
max-conn-rbuf-size : 268435456


Expand Down
3 changes: 1 addition & 2 deletions docs/ops/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,7 @@ identify-binlog-type : new
# 主从同步流量控制的的窗口,主从高延迟情形下可以通过提高该参数提高同步性能。默认值9000最大值90000。
sync-window-size : 9000
# 处理客户端连接请求的最大缓存大小,可配置的数值为67108864(64MB) 或 268435456(256MB) 或 536870912(512MB)
# 默认是268435456(256MB),需要注意的是主从的配置需要一致。
# 处理客户端连接请求的最大缓存大小,默认是268435456(256MB),范围为[64MB, 1GB],需要注意的是主从的配置需要一致。
# 单条命令超过此buffer大小,服务端会自动关闭与客户端的连接。
max-conn-rbuf-size : 268435456
Expand Down
8 changes: 8 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,7 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"zset-cache-start-direction",
"zset-cache-field-num-per-key",
"cache-lfu-decay-time",
"max-conn-rbuf-size",
});
res_.AppendStringVector(replyVt);
return;
Expand Down Expand Up @@ -2646,6 +2647,13 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetAclLogMaxLen(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "max-conn-rbuf-size") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival < PIKA_MAX_CONN_RBUF_LB || ival > PIKA_MAX_CONN_RBUF_HB * 2) {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-conn-rbuf-size'\r\n");
return;
}
g_pika_conf->SetMaxConnRbufSize(static_cast<int>(ival));
res_.AppendStringRaw("+OK\r\n");
} else {
res_.AppendStringRaw("-ERR Unsupported CONFIG parameter: " + set_item + "\r\n");
}
Expand Down
9 changes: 6 additions & 3 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,10 +538,12 @@ int PikaConf::Load() {
// max conn rbuf size
int tmp_max_conn_rbuf_size = PIKA_MAX_CONN_RBUF;
GetConfIntHuman("max-conn-rbuf-size", &tmp_max_conn_rbuf_size);
if (tmp_max_conn_rbuf_size == PIKA_MAX_CONN_RBUF_LB || tmp_max_conn_rbuf_size == PIKA_MAX_CONN_RBUF_HB) {
max_conn_rbuf_size_.store(tmp_max_conn_rbuf_size);
if (tmp_max_conn_rbuf_size <= PIKA_MAX_CONN_RBUF_LB) {
max_conn_rbuf_size_.store(PIKA_MAX_CONN_RBUF_LB);
} else if (tmp_max_conn_rbuf_size >= PIKA_MAX_CONN_RBUF_HB * 2) {
max_conn_rbuf_size_.store(PIKA_MAX_CONN_RBUF_HB * 2);
} else {
max_conn_rbuf_size_.store(PIKA_MAX_CONN_RBUF);
max_conn_rbuf_size_.store(tmp_max_conn_rbuf_size);
}

// rocksdb blob configure
Expand Down Expand Up @@ -656,6 +658,7 @@ int PikaConf::ConfigRewrite() {
SetConfInt("consensus-level", consensus_level_.load());
SetConfInt("replication-num", replication_num_.load());
SetConfStr("slow-cmd-list", pstd::Set2String(slow_cmd_set_, ','));
SetConfInt("max-conn-rbuf-size", max_conn_rbuf_size_.load());
// options for storage engine
SetConfInt("max-cache-files", max_cache_files_);
SetConfInt("max-background-compactions", max_background_compactions_);
Expand Down
3 changes: 2 additions & 1 deletion src/pika_repl_client_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ void PikaReplClientThread::ReplClientHandle::FdTimeoutHandle(int fd, const std::
}
if (ip == g_pika_server->master_ip() && port == g_pika_server->master_port() + kPortShiftReplServer &&
PIKA_REPL_ERROR != g_pika_server->repl_state() &&
PikaReplicaManager::CheckSlaveDBState(ip, port)) { // if state machine in error state, no retry
PikaReplicaManager::CheckSlaveDBState(ip, port)) {
// if state machine equal to kDBNoConnect(execute cmd 'dbslaveof db no one'), no retry
LOG(WARNING) << "Master conn timeout : " << ip_port << " try reconnect";
g_pika_server->ResetMetaSyncStatus();
}
Expand Down
2 changes: 2 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,8 @@ int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) {
master_boffset.offset - sent_slave_boffset.offset;
tmp_stream << "(" << db->DBName() << ":" << lag << ")";
}
} else if (s.ok() && slave_state == SlaveState::kSlaveDbSync){
tmp_stream << "(" << db->DBName() << ":full syncing)";
} else {
tmp_stream << "(" << db->DBName() << ":not syncing)";
}
Expand Down
3 changes: 1 addition & 2 deletions tests/assets/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,8 @@ slave-priority : 100
sync-window-size : 9000

# Maximum buffer size of a client connection.
# Only three values are valid here: [67108864(64MB) | 268435456(256MB) | 536870912(512MB)].
# [NOTICE] Master and slaves must have exactly the same value for the max-conn-rbuf-size.
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB).
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
max-conn-rbuf-size : 268435456


Expand Down
3 changes: 1 addition & 2 deletions tests/conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,8 @@ slave-priority : 100
sync-window-size : 9000

# Maximum buffer size of a client connection.
# Only three values are valid here: [67108864(64MB) | 268435456(256MB) | 536870912(512MB)].
# [NOTICE] Master and slaves must have exactly the same value for the max-conn-rbuf-size.
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB).
# Supported Units [K|M|G]. Its default unit is in [bytes] and its default value is 268435456(256MB). The value range is [64MB, 1GB].
max-conn-rbuf-size : 268435456


Expand Down
2 changes: 1 addition & 1 deletion tests/integration/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Cache test", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/csanning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var _ = Describe("Csanning Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/geo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Geo Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var _ = Describe("Hash Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/hyperloglog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Hyperloglog Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("List Commands", func() {
var blockedLock sync.Mutex

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down Expand Up @@ -916,8 +916,8 @@ var _ = Describe("List Commands", func() {
Expect(lRange.Err()).NotTo(HaveOccurred())
Expect(lRange.Val()).To(Equal([]string{"two", "three"}))

err := client.Do(ctx, "LPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command")))
err := client.Do(ctx, "LPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'lpop' command")))
})

It("should LPopCount", func() {
Expand Down Expand Up @@ -1162,7 +1162,7 @@ var _ = Describe("List Commands", func() {
Expect(lRange.Val()).To(Equal([]string{"one", "two"}))

err := client.Do(ctx, "RPOP", "list", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command")))
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'rpop' command")))
})

It("should RPopCount", func() {
Expand Down
26 changes: 11 additions & 15 deletions tests/integration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ import (
"github.com/redis/go-redis/v9"
)

const (
LOCALHOST = "127.0.0.1"
SLAVEPORT = "9231"
MASTERPORT = "9241"
SINGLEADDR = "127.0.0.1:9221"
SLAVEADDR = "127.0.0.1:9231"
MASTERADDR = "127.0.0.1:9241"
)

type TimeValue struct {
time.Time
}
Expand All @@ -15,22 +24,9 @@ func (t *TimeValue) ScanRedis(s string) (err error) {
return
}

func pikaOptions1() *redis.Options {
return &redis.Options{
Addr: "127.0.0.1:9221",
DB: 0,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
MaxRetries: -1,
PoolSize: 30,
PoolTimeout: 60 * time.Second,
}
}

func pikaOptions2() *redis.Options {
func PikaOption(addr string) *redis.Options {
return &redis.Options{
Addr: "127.0.0.1:9231",
Addr: addr,
DB: 0,
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ var _ = Describe("PubSub", func() {
ctx := context.TODO()

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client2 = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
client2 = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(client2.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(2 * time.Second)
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ var _ = Describe("should replication ", func() {
var clientMaster *redis.Client

BeforeEach(func() {
clientMaster = redis.NewClient(pikaOptions1())
clientSlave = redis.NewClient(pikaOptions2())
clientMaster = redis.NewClient(PikaOption(MASTERADDR))
clientSlave = redis.NewClient(PikaOption(SLAVEADDR))
cleanEnv(ctx, clientMaster, clientSlave)
Expect(clientSlave.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expect(clientMaster.FlushDB(ctx).Err()).NotTo(HaveOccurred())
Expand All @@ -395,11 +395,11 @@ var _ = Describe("should replication ", func() {
infoRes = clientMaster.Info(ctx, "replication")
Expect(infoRes.Err()).NotTo(HaveOccurred())
Expect(infoRes.Val()).To(ContainSubstring("role:master"))
Expect(clientSlave.Do(ctx, "slaveof", "127.0.0.1", "9231").Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same"))
Expect(clientSlave.Do(ctx, "slaveof", LOCALHOST, SLAVEPORT).Err()).To(MatchError("ERR The master ip:port and the slave ip:port are the same"))

var count = 0
for {
res := trySlave(ctx, clientSlave, "127.0.0.1", "9221")
res := trySlave(ctx, clientSlave, LOCALHOST, MASTERPORT)
if res {
break
} else if count > 4 {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Server", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var _ = Describe("Set Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down Expand Up @@ -276,8 +276,8 @@ var _ = Describe("Set Commands", func() {
Expect(sMembers.Err()).NotTo(HaveOccurred())
Expect(sMembers.Val()).To(HaveLen(3))

err := client.Do(ctx, "SPOP", "set", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command")))
err := client.Do(ctx, "SPOP", "set", 1, 2).Err()
Expect(err).To(MatchError(ContainSubstring("ERR wrong number of arguments for 'spop' command")))
})

It("should SPopN", func() {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/slowlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var _ = Describe("Slowlog Commands", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(pikaOptions1())
client = redis.NewClient(PikaOption(SINGLEADDR))
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
time.Sleep(1 * time.Second)
})
Expand Down
9 changes: 8 additions & 1 deletion tests/integration/start_master_and_slave.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,18 @@
# This script is used by .github/workflows/pika.yml, Do not modify this file unless you know what you are doing.
# it's used to start pika master and slave, running path: build
cp ../../output/pika ./pika
cp ../conf/pika.conf ./pika_single.conf
cp ../conf/pika.conf ./pika_master.conf
cp ../conf/pika.conf ./pika_slave.conf
# Create folders for storing data on the primary and secondary nodes
mkdir master_data
mkdir slave_data
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf
# Example Change the location for storing data on primary and secondary nodes in the configuration file
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_single.conf
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9241|' -e 's|log-path : ./log/|log-path : ./master_data/log/|' -e 's|db-path : ./db/|db-path : ./master_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./master_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./master_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./master_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_master.conf
sed -i '' -e 's|databases : 1|databases : 2|' -e 's|port : 9221|port : 9231|' -e 's|log-path : ./log/|log-path : ./slave_data/log/|' -e 's|db-path : ./db/|db-path : ./slave_data/db/|' -e 's|dump-path : ./dump/|dump-path : ./slave_data/dump/|' -e 's|pidfile : ./pika.pid|pidfile : ./slave_data/pika.pid|' -e 's|db-sync-path : ./dbsync/|db-sync-path : ./slave_data/dbsync/|' -e 's|#daemonize : yes|daemonize : yes|' ./pika_slave.conf
# Start three nodes
./pika -c ./pika_single.conf
./pika -c ./pika_master.conf
./pika -c ./pika_slave.conf
#ensure both master and slave are ready
Expand Down
Loading

0 comments on commit 6ef765d

Please sign in to comment.