Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delete binglog dead code #2274

Merged
merged 1 commit into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ class ConsensusCoordinator {
// invoked by dbsync process
pstd::Status Reset(const LogOffset& offset);

pstd::Status ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr);
pstd::Status ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status UpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end);
pstd::Status AddSlaveNode(const std::string& ip, int port, int session_id);
Expand Down Expand Up @@ -184,9 +182,8 @@ class ConsensusCoordinator {
bool MatchConsensusLevel();
pstd::Status TruncateTo(const LogOffset& offset);

pstd::Status InternalAppendLog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr, std::shared_ptr<std::string> resp_ptr);
pstd::Status InternalAppendBinlog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr, LogOffset* log_offset);
pstd::Status InternalAppendLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status InternalAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
void InternalApply(const MemLog::LogItem& log);
void InternalApplyFollower(const MemLog::LogItem& log);

Expand Down
2 changes: 0 additions & 2 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ class SyncMasterSlot : public SyncSlot {

// consensus use
pstd::Status ConsensusUpdateSlave(const std::string& ip, int port, const LogOffset& start, const LogOffset& end);
pstd::Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr);
pstd::Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ConsensusSanityCheck();
pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
Expand Down
3 changes: 1 addition & 2 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -943,8 +943,7 @@ void Cmd::DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) {
return;
}

Status s =
slot->ConsensusProposeLog(shared_from_this(), std::dynamic_pointer_cast<PikaClientConn>(conn_ptr), resp_ptr);
Status s = slot->ConsensusProposeLog(shared_from_this());
if (!s.ok()) {
LOG(WARNING) << slot->SyncSlotInfo().ToString() << " Writing binlog failed, maybe no space left on device "
<< s.ToString();
Expand Down
36 changes: 9 additions & 27 deletions src/pika_consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,20 +344,16 @@ Status ConsensusCoordinator::Reset(const LogOffset& offset) {
return Status::OK();
}

Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr) {
Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
std::vector<std::string> keys = cmd_ptr->current_key();
// slotkey shouldn't add binlog
if (cmd_ptr->name() == kCmdNameSAdd && !keys.empty() &&
(keys[0].compare(0, SlotKeyPrefix.length(), SlotKeyPrefix) == 0 || keys[0].compare(0, SlotTagPrefix.length(), SlotTagPrefix) == 0)) {
return Status::OK();
}

LogOffset log_offset;

BinlogItem item;
// make sure stable log and mem log consistent
Status s = InternalAppendLog(item, cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
Status s = InternalAppendLog(cmd_ptr);
if (!s.ok()) {
return s;
}
Expand All @@ -366,19 +362,8 @@ Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std
return Status::OK();
}

Status ConsensusCoordinator::ProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
return ProposeLog(cmd_ptr, nullptr, nullptr);
}

Status ConsensusCoordinator::InternalAppendLog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr) {
LogOffset log_offset;
Status s = InternalAppendBinlog(item, cmd_ptr, &log_offset);
if (!s.ok()) {
return s;
}
return Status::OK();
Status ConsensusCoordinator::InternalAppendLog(const std::shared_ptr<Cmd>& cmd_ptr) {
return InternalAppendBinlog(cmd_ptr);
}

// precheck if prev_offset match && drop this log if this log exist
Expand All @@ -390,7 +375,7 @@ Status ConsensusCoordinator::ProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_pt
return Status::OK();
}

Status s = InternalAppendLog(attribute, cmd_ptr, nullptr, nullptr);
Status s = InternalAppendLog(cmd_ptr);

InternalApplyFollower(MemLog::LogItem(LogOffset(), cmd_ptr, nullptr, nullptr));
return Status::OK();
Expand All @@ -407,8 +392,7 @@ Status ConsensusCoordinator::UpdateSlave(const std::string& ip, int port, const
return Status::OK();
}

Status ConsensusCoordinator::InternalAppendBinlog(const BinlogItem& item, const std::shared_ptr<Cmd>& cmd_ptr,
LogOffset* log_offset) {
Status ConsensusCoordinator::InternalAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr) {
std::string content = cmd_ptr->ToRedisProtocol();
Status s = stable_logger_->Logger()->Put(content);
if (!s.ok()) {
Expand All @@ -419,11 +403,9 @@ Status ConsensusCoordinator::InternalAppendBinlog(const BinlogItem& item, const
}
return s;
}
uint32_t filenum;
uint64_t offset;
stable_logger_->Logger()->GetProducerStatus(&filenum, &offset);
*log_offset = LogOffset(BinlogOffset(filenum, offset), LogicOffset(item.term_id(), item.logic_id()));
return Status::OK();
uint32_t filenum = 0;
uint64_t offset = 0;
return stable_logger_->Logger()->GetProducerStatus(&filenum, &offset);
}

Status ConsensusCoordinator::ScheduleApplyLog(const LogOffset& committed_index) {
Expand Down
6 changes: 0 additions & 6 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,16 +477,10 @@ bool SyncMasterSlot::CheckSessionId(const std::string& ip, int port, const std::
return true;
}

Status SyncMasterSlot::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr, std::shared_ptr<PikaClientConn> conn_ptr,
std::shared_ptr<std::string> resp_ptr) {
return coordinator_.ProposeLog(cmd_ptr, std::move(conn_ptr), std::move(resp_ptr));
}

Status SyncMasterSlot::ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr) {
return coordinator_.ProposeLog(cmd_ptr);
}


Status SyncMasterSlot::ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute) {
return coordinator_.ProcessLeaderLog(cmd_ptr, attribute);
}
Expand Down
Loading