Skip to content

Commit

Permalink
fix(replication): didn't resume the db status after restarting full s…
Browse files Browse the repository at this point in the history
…ync (#2549)

Currently, the pre_fullsync_cb will stop the task runner and set the DB loading status to yes,
but it didn't resume those states. This will cause the server to run in restoring status until
success in resyncing from the master. To fix this, we need to call the post_fullsync_cb to resume those statuses
before restarting full sync.

This PR also uses try_lock to allow the replication thread to be stopped while preparing the restore db.
  • Loading branch information
git-hulk authored Sep 21, 2024
1 parent a7dbd74 commit f402be0
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 18 deletions.
14 changes: 9 additions & 5 deletions src/cluster/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ ReplicationThread::ReplicationThread(std::string host, uint32_t port, Server *sr
CallbackType{CallbacksStateMachine::WRITE, "fullsync write", &ReplicationThread::fullSyncWriteCB},
CallbackType{CallbacksStateMachine::READ, "fullsync read", &ReplicationThread::fullSyncReadCB}}) {}

Status ReplicationThread::Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb) {
Status ReplicationThread::Start(std::function<bool()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb) {
pre_fullsync_cb_ = std::move(pre_fullsync_cb);
post_fullsync_cb_ = std::move(post_fullsync_cb);

Expand Down Expand Up @@ -700,25 +700,28 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
fullsync_state_ = kFetchMetaID;
LOG(INFO) << "[replication] Succeeded fetching full data files info, fetching files in parallel";

bool pre_fullsync_done = false;
// If 'slave-empty-db-before-fullsync' is yes, we call 'pre_fullsync_cb_'
// just like reloading database. And we don't want slave to occupy too much
// disk space, so we just empty entire database rudely.
if (srv_->GetConfig()->slave_empty_db_before_fullsync) {
pre_fullsync_cb_();
if (!pre_fullsync_cb_()) return CBState::RESTART;
pre_fullsync_done = true;
storage_->EmptyDB();
}

repl_state_.store(kReplFetchSST, std::memory_order_relaxed);
auto s = parallelFetchFile(target_dir, meta.files);
if (!s.IsOK()) {
if (pre_fullsync_done) post_fullsync_cb_();
LOG(ERROR) << "[replication] Failed to parallel fetch files while " + s.Msg();
return CBState::RESTART;
}
LOG(INFO) << "[replication] Succeeded fetching files in parallel, restoring the backup";

// Restore DB from backup
// We already call 'pre_fullsync_cb_' if 'slave-empty-db-before-fullsync' is yes
if (!srv_->GetConfig()->slave_empty_db_before_fullsync) pre_fullsync_cb_();
// Don't need to call 'pre_fullsync_cb_' again if it was called before
if (!pre_fullsync_done && !pre_fullsync_cb_()) return CBState::RESTART;

// For old version, master uses rocksdb backup to implement data snapshot
if (srv_->GetConfig()->master_use_repl_port) {
s = storage_->RestoreFromBackup();
Expand All @@ -727,6 +730,7 @@ ReplicationThread::CBState ReplicationThread::fullSyncReadCB(bufferevent *bev) {
}
if (!s.IsOK()) {
LOG(ERROR) << "[replication] Failed to restore backup while " + s.Msg() + ", restart fullsync";
post_fullsync_cb_();
return CBState::RESTART;
}
LOG(INFO) << "[replication] Succeeded restoring the backup, fullsync was finish";
Expand Down
5 changes: 3 additions & 2 deletions src/cluster/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ class FeedSlaveThread {
class ReplicationThread : private EventCallbackBase<ReplicationThread> {
public:
explicit ReplicationThread(std::string host, uint32_t port, Server *srv);
Status Start(std::function<void()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb);
Status Start(std::function<bool()> &&pre_fullsync_cb, std::function<void()> &&post_fullsync_cb);
void Stop();
bool IsStopped() const { return stop_flag_; }
ReplState State() { return repl_state_.load(std::memory_order_relaxed); }
int64_t LastIOTimeSecs() const { return last_io_time_secs_.load(std::memory_order_relaxed); }

Expand Down Expand Up @@ -159,7 +160,7 @@ class ReplicationThread : private EventCallbackBase<ReplicationThread> {
bool next_try_old_psync_ = false;
bool next_try_without_announce_ip_address_ = false;

std::function<void()> pre_fullsync_cb_;
std::function<bool()> pre_fullsync_cb_;
std::function<void()> post_fullsync_cb_;

// Internal states managed by FullSync procedure
Expand Down
28 changes: 18 additions & 10 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reco
if (GetConfig()->master_use_repl_port) master_listen_port += 1;

replication_thread_ = std::make_unique<ReplicationThread>(host, master_listen_port, this);
auto s = replication_thread_->Start([this]() { PrepareRestoreDB(); },
auto s = replication_thread_->Start([this]() { return PrepareRestoreDB(); },
[this]() {
this->is_loading_ = false;
if (auto s = task_runner_.Start(); !s) {
Expand Down Expand Up @@ -1336,18 +1336,11 @@ std::string Server::GetRocksDBStatsJson() const {
// This function is called by replication thread when finished fetching all files from its master.
// Before restoring the db from backup or checkpoint, we should
// guarantee other threads don't access DB and its column families, then close db.
void Server::PrepareRestoreDB() {
bool Server::PrepareRestoreDB() {
// Stop feeding slaves thread
LOG(INFO) << "[server] Disconnecting slaves...";
DisconnectSlaves();

// Stop task runner
LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
task_runner_.Cancel();
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << "[server] " << s.Msg();
}

// If the DB is restored, the object 'db_' will be destroyed, but
// 'db_' will be accessed in data migration task. To avoid wrong
// accessing, data migration task should be stopped before restoring DB
Expand All @@ -1362,12 +1355,27 @@ void Server::PrepareRestoreDB() {
// ASAP to avoid user can't receive responses for long time, because the following
// 'CloseDB' may cost much time to acquire DB mutex.
LOG(INFO) << "[server] Waiting workers for finishing executing commands...";
{ auto exclusivity = WorkExclusivityGuard(); }
while (!works_concurrency_rw_lock_.try_lock()) {
if (replication_thread_->IsStopped()) {
is_loading_ = false;
return false;
}
usleep(1000);
}
works_concurrency_rw_lock_.unlock();

// Stop task runner
LOG(INFO) << "[server] Stopping the task runner and clear task queue...";
task_runner_.Cancel();
if (auto s = task_runner_.Join(); !s) {
LOG(WARNING) << "[server] " << s.Msg();
}

// Cron thread, compaction checker thread, full synchronization thread
// may always run in the background, we need to close db, so they don't actually work.
LOG(INFO) << "[server] Waiting for closing DB...";
storage->CloseDB();
return true;
}

void Server::WaitNoMigrateProcessing() {
Expand Down
2 changes: 1 addition & 1 deletion src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class Server {
std::string GetRocksDBStatsJson() const;
ReplState GetReplicationState();

void PrepareRestoreDB();
bool PrepareRestoreDB();
void WaitNoMigrateProcessing();
Status AsyncCompactDB(const std::string &begin_key = "", const std::string &end_key = "");
Status AsyncBgSaveDB();
Expand Down
1 change: 1 addition & 0 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ Status Storage::RestoreFromCheckpoint() {
// Clean old backups and checkpoints because server will work on the new db
PurgeOldBackups(0, 0);
rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options());
rocksdb::DestroyDB(tmp_dir, rocksdb::Options());

// Maybe there is no database directory
auto s = env_->CreateDirIfMissing(config_->db_dir);
Expand Down

0 comments on commit f402be0

Please sign in to comment.