Skip to content

Commit

Permalink
revised based on reviewer's opinion
Browse files Browse the repository at this point in the history
  • Loading branch information
cheniujh committed Jul 29, 2024
1 parent 943b94e commit bd465ca
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 11 deletions.
12 changes: 5 additions & 7 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,12 @@ class ConsensusCoordinator {
return tmp_stream.str();
}

void IncrUnfinishedAsyncWriteDbTaskCount(int32_t step_size) {
unfinished_async_write_db_task_count_.fetch_add(step_size, std::memory_order::memory_order_seq_cst);
LOG(INFO) << "incur 1, curr:" << unfinished_async_write_db_task_count_.load();
void IncrAsyncWriteDBTaskCount(int32_t step_size) {
async_write_db_task_count_.fetch_add(step_size, std::memory_order::memory_order_seq_cst);
}

void DecrUnfinishedAsyncWriteDbTaskCount(int32_t step_size) {
unfinished_async_write_db_task_count_.fetch_sub(step_size, std::memory_order::memory_order_seq_cst);
LOG(INFO) << "decr 1, curr:" << unfinished_async_write_db_task_count_.load();
void DecrAsyncWriteDBTaskCount(int32_t step_size) {
async_write_db_task_count_.fetch_sub(step_size, std::memory_order::memory_order_seq_cst);
}

private:
Expand Down Expand Up @@ -213,6 +211,6 @@ class ConsensusCoordinator {
//queued or being executing by WriteDBWorkers. If a flushdb-binlog need to apply DB, it must wait
//util this count drop to zero. you can also check pika discussion #2807 to know more
//it is only used in slaveNode when comsuming binlog
std::atomic<int32_t> unfinished_async_write_db_task_count_{0};
std::atomic<int32_t> async_write_db_task_count_{0};
};
#endif // INCLUDE_PIKA_CONSENSUS_H_
2 changes: 1 addition & 1 deletion include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class PikaReplClient {
std::hash<std::string> str_hash;
std::vector<std::unique_ptr<PikaReplBgWorker>> write_binlog_workers_;
//[NOTICE] the task queue of WriteDBWorker must never be deliberately cleared,
// because their queue size are related with ConsensusCoordinator::unfinished_async_write_db_task_count_
// because their queue size are related with ConsensusCoordinator::async_write_db_task_count_
// check PR # /disscussion #2807 to know more
std::vector<std::unique_ptr<PikaReplBgWorker>> write_db_workers_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
// this is a flushdb-binlog, both apply binlog and apply db are in sync way
// ensure all writeDB task that submitted before has finished before we exec this flushdb
int32_t wait_ms = 250;
while (unfinished_async_write_db_task_count_.load(std::memory_order::memory_order_seq_cst) > 0) {
while (async_write_db_task_count_.load(std::memory_order::memory_order_seq_cst) > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(wait_ms));
wait_ms *= 2;
wait_ms = wait_ms < 3000 ? wait_ms : 3000;
Expand Down
3 changes: 1 addition & 2 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ std::string SyncDB::DBName() {
/* SyncMasterDB*/

SyncMasterDB::SyncMasterDB(const std::string& db_name)
: SyncDB(db_name), coordinator_(db_name) {
}
: SyncDB(db_name), coordinator_(db_name) {}

int SyncMasterDB::GetNumberOfSlaveNode() { return coordinator_.SyncPros().SlaveSize(); }

Expand Down

0 comments on commit bd465ca

Please sign in to comment.