Skip to content

Commit

Permalink
fix:some conf load error (#2561)
Browse files Browse the repository at this point in the history
* fix:slotmigrate failed when some keys migrate failed

---------

Co-authored-by: chejinge <[email protected]>
  • Loading branch information
chejinge and brother-jin committed Apr 3, 2024
1 parent bff739b commit 578c356
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 20 deletions.
7 changes: 5 additions & 2 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,16 +492,19 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("masterauth", value);
masterauth_ = value;
}
void SetSlotMigrate(const std::string& value) {
void SetSlotMigrate(const bool value) {
std::lock_guard l(rwlock_);
slotmigrate_ = (value == "yes");
TryPushDiffCommands("slotmigrate", value ? "yes" : "no");
slotmigrate_.store(value);
}
void SetSlotMigrateThreadNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("slotmigrate-thread-num", std::to_string(value));
slotmigrate_thread_num_ = value;
}
void SetThreadMigrateKeysNum(const int value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("thread-migrate-keys-num", std::to_string(value));
thread_migrate_keys_num_ = value;
}
void SetExpireLogsNums(const int value) {
Expand Down
29 changes: 19 additions & 10 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1682,13 +1682,13 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->slotmigrate() ? "yes" : "no");
}

if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)) {
if (pstd::stringmatch(pattern.data(), "slotmigrate-thread-num", 1)!= 0) {
elements += 2;
EncodeString(&config_body, "slotmigrate-thread-num");
EncodeNumber(&config_body, g_pika_conf->slotmigrate_thread_num());
}

if (pstd::stringmatch(pattern.data(), "thread-migrate-keys-num", 1)) {
if (pstd::stringmatch(pattern.data(), "thread-migrate-keys-num", 1)!= 0) {
elements += 2;
EncodeString(&config_body, "thread-migrate-keys-num");
EncodeNumber(&config_body, g_pika_conf->thread_migrate_keys_num());
Expand Down Expand Up @@ -2231,9 +2231,6 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
} else if (set_item == "masterauth") {
g_pika_conf->SetMasterAuth(value);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slotmigrate") {
g_pika_conf->SetSlotMigrate(value);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "dump-prefix") {
g_pika_conf->SetBgsavePrefix(value);
res_.AppendStringRaw("+OK\r\n");
Expand Down Expand Up @@ -2282,19 +2279,19 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slotmigrate-thread-num") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n");
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slotmigrate-thread-num'\r\n");
return;
}
long int migrate_thread_num = (0 > ival || 24 < ival) ? 8 : ival;
g_pika_conf->SetSlotMigrateThreadNum(static_cast<int>(ival));
long int migrate_thread_num = (1 > ival || 24 < ival) ? 8 : ival;
g_pika_conf->SetSlotMigrateThreadNum(migrate_thread_num);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "thread-migrate-keys-num") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'expire-logs-nums'\r\n");
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'thread-migrate-keys-num'\r\n");
return;
}
long int thread_migrate_keys_num = (8 > ival || 128 < ival) ? 64 : ival;
g_pika_conf->SetThreadMigrateKeysNum(static_cast<int>(ival));
g_pika_conf->SetThreadMigrateKeysNum(thread_migrate_keys_num);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slowlog-write-errorlog") {
bool is_write_errorlog;
Expand All @@ -2308,6 +2305,18 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
}
g_pika_conf->SetSlowlogWriteErrorlog(is_write_errorlog);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slotmigrate") {
bool slotmigrate;
if (value == "yes") {
slotmigrate = true;
} else if (value == "no") {
slotmigrate = false;
} else {
res_.AppendStringRaw( "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slotmigrate'\r\n");
return;
}
g_pika_conf->SetSlotMigrate(slotmigrate);
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "slowlog-log-slower-than") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival < 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'slowlog-log-slower-than'\r\n");
Expand Down
15 changes: 7 additions & 8 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ int PikaConf::Load() {
slowlog_write_errorlog_.store(swe == "yes" ? true : false);

// slot migrate
std::string smgrt = "no";
std::string smgrt;
GetConfStr("slotmigrate", &smgrt);
slotmigrate_ = (smgrt == "yes") ? true : false;
slotmigrate_.store(smgrt == "yes" ? true : false);

int binlog_writer_num = 1;
GetConfInt("binlog-writer-num", &binlog_writer_num);
Expand Down Expand Up @@ -280,13 +280,13 @@ int PikaConf::Load() {

// arena_block_size
GetConfInt64Human("slotmigrate-thread-num", &slotmigrate_thread_num_);
if (slotmigrate_thread_num_ < 0 || slotmigrate_thread_num_ > 24) {
if (slotmigrate_thread_num_ < 1 || slotmigrate_thread_num_ > 24) {
slotmigrate_thread_num_ = 8; // 1/8 of the write_buffer_size_
}

// arena_block_size
GetConfInt64Human("thread-migrate-keys-num", &thread_migrate_keys_num_);
if (thread_migrate_keys_num_ < 64 || thread_migrate_keys_num_ > 128) {
if (thread_migrate_keys_num_ < 8 || thread_migrate_keys_num_ > 128) {
thread_migrate_keys_num_ = 64; // 1/8 of the write_buffer_size_
}

Expand Down Expand Up @@ -687,13 +687,12 @@ int PikaConf::ConfigRewrite() {
SetConfInt("max-write-buffer-num", max_write_buffer_num_);
SetConfInt64("write-buffer-size", write_buffer_size_);
SetConfInt64("arena-block-size", arena_block_size_);
SetConfInt64("slotmigrate", slotmigrate_);
SetConfStr("slotmigrate", slotmigrate_.load() ? "yes" : "no");
SetConfInt64("slotmigrate-thread-num", slotmigrate_thread_num_);
SetConfInt64("thread-migrate-keys-num", thread_migrate_keys_num_);
// slaveof config item is special
SetConfStr("slaveof", slaveof_);
// cache config
SetConfStr("share-block-cache", share_block_cache_ ? "yes" : "no");
SetConfInt("block-size", block_size_);
SetConfInt("block-cache", block_cache_);
SetConfStr("cache-index-and-filter-blocks", cache_index_and_filter_blocks_ ? "yes" : "no");
SetConfInt("cache-model", cache_model_);
SetConfInt("zset-cache-start-direction", zset_cache_start_direction_);
Expand Down
71 changes: 71 additions & 0 deletions tests/integration/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,83 @@ var _ = Describe("Server", func() {
//Expect(configSet.Val()).To(Equal("OK"))
})

It("should ConfigGet", func() {
configGet := client.ConfigGet(ctx, "slotmigrate")
Expect(configGet.Err()).NotTo(HaveOccurred())
Expect(configGet.Val()).To(Equal(map[string]string{"slotmigrate": "no"}))
})

It("should ConfigSet", func() {
configSet := client.ConfigSet(ctx, "slotmigrate", "yes")
Expect(configSet.Err()).NotTo(HaveOccurred())
Expect(configSet.Val()).To(Equal("OK"))
})

It("should ConfigGet", func() {
configGet1 := client.ConfigGet(ctx, "slotmigrate-thread-num")
Expect(configGet1.Err()).NotTo(HaveOccurred())
Expect(configGet1.Val()).NotTo(Equal("0"))
})

It("should ConfigGet", func() {
configGet2 := client.ConfigGet(ctx, "thread-migrate-keys-num")
Expect(configGet2.Err()).NotTo(HaveOccurred())
Expect(configGet2.Val()).NotTo(Equal("0"))
})

It("should ConfigSet", func() {
configSet1 := client.ConfigSet(ctx, "slotmigrate-thread-num", "4")
Expect(configSet1.Err()).NotTo(HaveOccurred())
Expect(configSet1.Val()).To(Equal("OK"))
})

It("should ConfigSet", func() {
configSet2 := client.ConfigSet(ctx, "thread-migrate-keys-num", "64")
Expect(configSet2.Err()).NotTo(HaveOccurred())
Expect(configSet2.Val()).To(Equal("OK"))
})

It("should ConfigGet", func() {
configGet2 := client.ConfigGet(ctx, "block-cache")
Expect(configGet2.Err()).NotTo(HaveOccurred())
Expect(configGet2.Val()).NotTo(Equal("0"))
})

It("should ConfigRewrite", func() {
configRewrite := client.ConfigRewrite(ctx)
Expect(configRewrite.Err()).NotTo(HaveOccurred())
Expect(configRewrite.Val()).To(Equal("OK"))
})

It("should ConfigGet", func() {
configGet3 := client.ConfigGet(ctx, "block-cache")
Expect(configGet3.Err()).NotTo(HaveOccurred())
Expect(configGet3.Val()).To(Equal(map[string]string{"block-cache": "8388608"}))
})

It("should ConfigGet", func() {
configGet4 := client.ConfigGet(ctx, "slotmigrate-thread-num")
Expect(configGet4.Err()).NotTo(HaveOccurred())
Expect(configGet4.Val()).To(Equal(map[string]string{"slotmigrate-thread-num": "4"}))
})

It("should ConfigGet", func() {
configGet5 := client.ConfigGet(ctx, "thread-migrate-keys-num")
Expect(configGet5.Err()).NotTo(HaveOccurred())
Expect(configGet5.Val()).To(Equal(map[string]string{"thread-migrate-keys-num": "64"}))
})

It("should ConfigSet", func() {
configSet := client.ConfigSet(ctx, "slotmigrate", "no")
Expect(configSet.Err()).NotTo(HaveOccurred())
Expect(configSet.Val()).To(Equal("OK"))
})

It("should ConfigRewrite", func() {
configRewrite := client.ConfigRewrite(ctx)
Expect(configRewrite.Err()).NotTo(HaveOccurred())
Expect(configRewrite.Val()).To(Equal("OK"))
})
//It("should DBSize", func() {
// Expect(client.Set(ctx, "key", "value", 0).Val()).To(Equal("OK"))
// Expect(client.Do(ctx, "info", "keyspace", "1").Err()).NotTo(HaveOccurred())
Expand Down

0 comments on commit 578c356

Please sign in to comment.