Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: flushdb may cause master-slave inconsistency #2808

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class ConsensusCoordinator {
pstd::Status InternalAppendLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status InternalAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
void InternalApply(const MemLog::LogItem& log);
void InternalApplyFollower(const MemLog::LogItem& log);
void InternalApplyFollower(const std::shared_ptr<Cmd>& cmd_ptr);

pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, LogOffset* log_offset);
pstd::Status GetBinlogOffset(const BinlogOffset& start_offset, const BinlogOffset& end_offset,
Expand All @@ -182,6 +182,7 @@ class ConsensusCoordinator {
pstd::Status FindLogicOffset(const BinlogOffset& start_offset, uint64_t target_index, LogOffset* found_offset);
pstd::Status GetLogsBefore(const BinlogOffset& start_offset, std::vector<LogOffset>* hints);

private:
// keep members in this class works in order
pstd::Mutex order_mu_;

Expand Down
6 changes: 3 additions & 3 deletions include/pika_repl_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <memory>
#include <string>

#include <functional>
#include "net/include/bg_thread.h"
#include "net/include/pb_conn.h"
#include "net/include/thread_pool.h"
Expand All @@ -25,13 +25,13 @@ class PikaReplBgWorker {
int StartThread();
int StopThread();
void Schedule(net::TaskFunc func, void* arg);
void QueueClear();
void Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back);
static void HandleBGWorkerWriteBinlog(void* arg);
static void HandleBGWorkerWriteDB(void* arg);
static void WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr);
void SetThreadName(const std::string& thread_name) {
bg_thread_.set_thread_name(thread_name);
}

BinlogItem binlog_item_;
net::RedisParser redis_parser_;
std::string ip_port_;
Expand Down
36 changes: 29 additions & 7 deletions include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,8 @@ struct ReplClientWriteBinlogTaskArg {

struct ReplClientWriteDBTaskArg {
const std::shared_ptr<Cmd> cmd_ptr;
LogOffset offset;
std::string db_name;
ReplClientWriteDBTaskArg(std::shared_ptr<Cmd> _cmd_ptr, const LogOffset& _offset, std::string _db_name)
: cmd_ptr(std::move(_cmd_ptr)),
offset(_offset),
db_name(std::move(_db_name)) {}
explicit ReplClientWriteDBTaskArg(std::shared_ptr<Cmd> _cmd_ptr)
: cmd_ptr(std::move(_cmd_ptr)) {}
~ReplClientWriteDBTaskArg() = default;
};

Expand All @@ -68,7 +64,7 @@ class PikaReplClient {
void ScheduleByDBName(net::TaskFunc func, void* arg, const std::string& db_name);
void ScheduleWriteBinlogTask(const std::string& db_name, const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name);

pstd::Status SendMetaSync();
pstd::Status SendDBSync(const std::string& ip, uint32_t port, const std::string& db_name,
Expand All @@ -80,6 +76,24 @@ class PikaReplClient {
const std::string& local_ip, bool is_first_send);
pstd::Status SendRemoveSlaveNode(const std::string& ip, uint32_t port, const std::string& db_name, const std::string& local_ip);

void IncrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
async_write_db_task_counts_[db_index].fetch_add(incr_step, std::memory_order::memory_order_seq_cst);
}

void DecrAsyncWriteDBTaskCount(const std::string& db_name, int32_t incr_step) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
async_write_db_task_counts_[db_index].fetch_sub(incr_step, std::memory_order::memory_order_seq_cst);
}

int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
int32_t db_index = db_name.back() - '0';
assert(db_index >= 0 && db_index <= 7);
return async_write_db_task_counts_[db_index].load(std::memory_order_seq_cst);
}

private:
size_t GetBinlogWorkerIndexByDBName(const std::string &db_name);
size_t GetHashIndexByKey(const std::string& key);
Expand All @@ -88,6 +102,14 @@ class PikaReplClient {
std::unique_ptr<PikaReplClientThread> client_thread_;
int next_avail_ = 0;
std::hash<std::string> str_hash;

// async_write_db_task_counts_ is used when consuming binlog, which indicates the nums of async write-DB tasks that are
// queued or being executing by WriteDBWorkers. If a flushdb-binlog need to apply DB, it must wait
// util this count drop to zero. you can also check pika discussion #2807 to know more
// it is only used in slaveNode when consuming binlog
std::atomic<int32_t> async_write_db_task_counts_[MAX_DB_NUM];
// [NOTICE] write_db_workers_ must be declared after async_write_db_task_counts_ to ensure write_db_workers_ will be destroyed before async_write_db_task_counts_
// when PikaReplClient is de-constructing, because some of the async task that exec by write_db_workers_ will manipulate async_write_db_task_counts_
std::vector<std::unique_ptr<PikaReplBgWorker>> write_binlog_workers_;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_db_workers_;
};
Expand Down
6 changes: 5 additions & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class PikaReplicaManager {
void ScheduleWriteBinlogTask(const std::string& db_name,
const std::shared_ptr<InnerMessage::InnerResponse>& res,
const std::shared_ptr<net::PbConn>& conn, void* res_private_data);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset, const std::string& db_name);
void ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name);
void ScheduleReplClientBGTaskByDBName(net::TaskFunc , void* arg, const std::string &db_name);
void ReplServerRemoveClientConn(int fd);
void ReplServerUpdateClientConnMap(const std::string& ip_port, int fd);
Expand All @@ -205,6 +205,10 @@ class PikaReplicaManager {
return sync_slave_dbs_;
}

int32_t GetUnfinishedAsyncWriteDBTaskCount(const std::string& db_name) {
return pika_repl_client_->GetUnfinishedAsyncWriteDBTaskCount(db_name);
}

private:
void InitDB();
pstd::Status SelectLocalIp(const std::string& remote_ip, int remote_port, std::string* local_ip);
Expand Down
17 changes: 13 additions & 4 deletions src/net/include/bg_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <atomic>
#include <queue>

#include <functional>
#include "net/include/net_thread.h"

#include "pstd/include/pstd_mutex.h"
Expand Down Expand Up @@ -41,7 +41,7 @@ class BGThread final : public Thread {
}

void Schedule(void (*function)(void*), void* arg);

void Schedule(void (*function)(void*), void* arg, std::function<void()>& call_back);
/*
* timeout is in millionsecond
*/
Expand All @@ -52,13 +52,22 @@ class BGThread final : public Thread {
void SwallowReadyTasks();

private:
struct BGItem {
class BGItem {
public:
void (*function)(void*);
void* arg;
//dtor_call_back is an optional call back fun
std::function<void()> dtor_call_back;
BGItem(void (*_function)(void*), void* _arg) : function(_function), arg(_arg) {}
BGItem(void (*_function)(void*), void* _arg, std::function<void()>& _dtor_call_back) : function(_function), arg(_arg), dtor_call_back(_dtor_call_back) {}
~BGItem() {
if (dtor_call_back) {
dtor_call_back();
}
}
};

std::queue<BGItem> queue_;
std::queue<std::unique_ptr<BGItem>> queue_;
std::priority_queue<TimerItem> timer_queue_;

size_t full_;
Expand Down
27 changes: 17 additions & 10 deletions src/net/src/bg_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,9 @@
// of patent rights can be found in the PATENTS file in the same directory.

#include "net/include/bg_thread.h"
#include <sys/time.h>
#include <cstdlib>
#include <mutex>

#include "pstd/include/pstd_mutex.h"
#include "pstd/include/xdebug.h"

namespace net {

void BGThread::Schedule(void (*function)(void*), void* arg) {
Expand All @@ -19,11 +15,22 @@ void BGThread::Schedule(void (*function)(void*), void* arg) {
wsignal_.wait(lock, [this]() { return queue_.size() < full_ || should_stop(); });

if (!should_stop()) {
queue_.emplace(function, arg);
queue_.emplace(std::make_unique<BGItem>(function, arg));
rsignal_.notify_one();
}
}

void BGThread::Schedule(void (*function)(void*), void* arg, std::function<void()>& call_back) {
std::unique_lock lock(mu_);

wsignal_.wait(lock, [this]() { return queue_.size() < full_ || should_stop(); });

if (!should_stop()) {
queue_.emplace(std::make_unique<BGItem>(function, arg, call_back));
rsignal_.notify_one();
}
};

void BGThread::QueueSize(int* pri_size, int* qu_size) {
std::lock_guard lock(mu_);
*pri_size = static_cast<int32_t>(timer_queue_.size());
Expand All @@ -32,7 +39,7 @@ void BGThread::QueueSize(int* pri_size, int* qu_size) {

void BGThread::QueueClear() {
std::lock_guard lock(mu_);
std::queue<BGItem>().swap(queue_);
std::queue<std::unique_ptr<BGItem>>().swap(queue_);
std::priority_queue<TimerItem>().swap(timer_queue_);
wsignal_.notify_one();
}
Expand All @@ -42,10 +49,10 @@ void BGThread::SwallowReadyTasks() {
// while the schedule function would stop to add any tasks.
mu_.lock();
while (!queue_.empty()) {
auto [function, arg] = queue_.front();
std::unique_ptr<BGItem> task_item = std::move(queue_.front());
queue_.pop();
mu_.unlock();
(*function)(arg);
task_item->function(task_item->arg);
mu_.lock();
}
mu_.unlock();
Expand Down Expand Up @@ -96,11 +103,11 @@ void* BGThread::ThreadMain() {
}

if (!queue_.empty()) {
auto [function, arg] = queue_.front();
std::unique_ptr<BGItem> task_item = std::move(queue_.front());
queue_.pop();
wsignal_.notify_one();
lock.unlock();
(*function)(arg);
task_item->function(task_item->arg);
}
}
// swalloc all the remain tasks in ready and timer queue
Expand Down
27 changes: 22 additions & 5 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,26 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
return Status::OK();
}

Status s = InternalAppendLog(cmd_ptr);

InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr));
auto opt = cmd_ptr->argv()[0];
if (pstd::StringToLower(opt) != kCmdNameFlushdb) {
// apply binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// apply db in async way
InternalApplyFollower(cmd_ptr);
} else {
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (g_pika_rm->GetUnfinishedAsyncWriteDBTaskCount(db_name_) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
}
// apply flushdb-binlog in sync way
Status s = InternalAppendLog(cmd_ptr);
// applyDB in sync way
PikaReplBgWorker::WriteDBInSyncWay(cmd_ptr);
}
return Status::OK();
}

Expand Down Expand Up @@ -406,8 +423,8 @@ uint32_t ConsensusCoordinator::term() {
return term_;
}

void ConsensusCoordinator::InternalApplyFollower(const MemLog::LogItem& log) {
g_pika_rm->ScheduleWriteDBTask(log.cmd_ptr, log.offset, db_name_);
void ConsensusCoordinator::InternalApplyFollower(const std::shared_ptr<Cmd>& cmd_ptr) {
g_pika_rm->ScheduleWriteDBTask(cmd_ptr, db_name_);
}

int ConsensusCoordinator::InitCmd(net::RedisParser* parser, const net::RedisCmdArgsType& argv) {
Expand Down
10 changes: 7 additions & 3 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ int PikaReplBgWorker::StopThread() { return bg_thread_.StopThread(); }

void PikaReplBgWorker::Schedule(net::TaskFunc func, void* arg) { bg_thread_.Schedule(func, arg); }

void PikaReplBgWorker::QueueClear() { bg_thread_.QueueClear(); }
void PikaReplBgWorker::Schedule(net::TaskFunc func, void* arg, std::function<void()>& call_back) {
bg_thread_.Schedule(func, arg, call_back);
}

void PikaReplBgWorker::ParseBinlogOffset(const InnerMessage::BinlogOffset& pb_offset, LogOffset* offset) {
offset->b_offset.filenum = pb_offset.filenum();
Expand Down Expand Up @@ -207,9 +209,11 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red
void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
std::unique_ptr<ReplClientWriteDBTaskArg> task_arg(static_cast<ReplClientWriteDBTaskArg*>(arg));
const std::shared_ptr<Cmd> c_ptr = task_arg->cmd_ptr;
WriteDBInSyncWay(c_ptr);
}

void PikaReplBgWorker::WriteDBInSyncWay(const std::shared_ptr<Cmd>& c_ptr) {
const PikaCmdArgsType& argv = c_ptr->argv();
LogOffset offset = task_arg->offset;
std::string db_name = task_arg->db_name;

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
Expand Down
17 changes: 12 additions & 5 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ using pstd::Status;
extern PikaServer* g_pika_server;
extern std::unique_ptr<PikaReplicaManager> g_pika_rm;

PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) {
for (int i = 0; i < MAX_DB_NUM; i++) {
async_write_db_task_counts_[i].store(0, std::memory_order::memory_order_seq_cst);
}
client_thread_ = std::make_unique<PikaReplClientThread>(cron_interval, keepalive_timeout);
client_thread_->set_thread_name("PikaReplClient");
for (int i = 0; i < g_pika_conf->sync_binlog_thread_num(); i++) {
Expand Down Expand Up @@ -98,13 +101,17 @@ void PikaReplClient::ScheduleWriteBinlogTask(const std::string& db_name,
write_binlog_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteBinlog, static_cast<void*>(task_arg));
}

void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset,
const std::string& db_name) {
void PikaReplClient::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name) {
const PikaCmdArgsType& argv = cmd_ptr->argv();
std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0];
size_t index = GetHashIndexByKey(dispatch_key);
auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr, offset, db_name);
write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg));
auto task_arg = new ReplClientWriteDBTaskArg(cmd_ptr);

IncrAsyncWriteDBTaskCount(db_name, 1);
std::function<void()> task_finish_call_back = [this, db_name]() { this->DecrAsyncWriteDBTaskCount(db_name, 1); };

write_db_workers_[index]->Schedule(&PikaReplBgWorker::HandleBGWorkerWriteDB, static_cast<void*>(task_arg),
task_finish_call_back);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
}

size_t PikaReplClient::GetBinlogWorkerIndexByDBName(const std::string &db_name) {
Expand Down
5 changes: 2 additions & 3 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,8 @@ void PikaReplicaManager::ScheduleWriteBinlogTask(const std::string& db,
pika_repl_client_->ScheduleWriteBinlogTask(db, res, conn, res_private_data);
}

void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const LogOffset& offset,
const std::string& db_name) {
pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, offset, db_name);
void PikaReplicaManager::ScheduleWriteDBTask(const std::shared_ptr<Cmd>& cmd_ptr, const std::string& db_name) {
pika_repl_client_->ScheduleWriteDBTask(cmd_ptr, db_name);
}

void PikaReplicaManager::ReplServerRemoveClientConn(int fd) { pika_repl_server_->RemoveClientConn(fd); }
Expand Down
Loading