Skip to content

Commit

Permalink
feature:add txn for pika(OpenAtomFoundation#1446)
Browse files Browse the repository at this point in the history
todo:test txn. Just to verify the feasibility of the program.
Signed-off-by: Hao Lee <[email protected]>
  • Loading branch information
ForestLH committed Jul 16, 2023
1 parent b9e92ee commit a24b9ec
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 55 deletions.
34 changes: 19 additions & 15 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ class PikaClientConn : public net::RedisConn {
//! InitCmdFailed指的是初始化某个任务的时候失败了
// WatchFailed指的是在watch之后,某个客户端修改了此事务watch的key
//!值得注意的是,watch的key是有db之分的
enum class TxnState { None, Start, InitCmdFailed, WatchFailed };
class TxnStateBitMask {
public:
static constexpr uint8_t Start = 0;
static constexpr uint8_t InitCmdFailed = 1;
static constexpr uint8_t WatchFailed = 2;
static constexpr uint8_t Execing = 3; // exec执行中
};

// Auth related
class AuthStat {
Expand All @@ -46,9 +52,7 @@ class PikaClientConn : public net::RedisConn {

PikaClientConn(int fd, const std::string& ip_port, net::Thread* server_thread, net::NetMultiplexer* mpx,
const net::HandleType& handle_type, int max_conn_rbuf_size);
~PikaClientConn() override {
LOG(INFO) << "lee : " << __FUNCTION__ << " " << String();
}
~PikaClientConn() override {}

void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async,
std::string* response) override;
Expand All @@ -62,27 +66,26 @@ class PikaClientConn : public net::RedisConn {
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentDb(const std::string& db_name) { current_db_ = db_name; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = std::move(cb); }

// Txn
void PushCmdToQue(std::shared_ptr<Cmd> cmd);
void SetTxnState(TxnState state);
std::vector<CmdRes> ExecTxnCmds();
std::shared_ptr<Cmd> PopCmdFromQue();
void ClearTxnCmdQue();
bool IsInTxn();
bool IsTxnFailed();
bool IsTxnInitFailed();
bool IsTxnWatchFailed();
void SetTxnWatchFailState(bool is_failed);
void SetTxnInitFailState(bool is_failed);
void SetTxnStartState(bool is_start);

void AddKeysToWatch(const std::vector<std::string> &db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string> &table_keys = {});
std::vector<std::string> GetTxnInvolvedDbs() { return txn_exec_dbs_; }
std::mutex& GetTxnDbMutex() { return txn_db_mu_; }
void ExitTxn() {
if (IsInTxn()) {
RemoveWatchedKeys();
ClearTxnCmdQue();
SetTxnState(TxnState::None);
}
}
void ExitTxn();

net::ServerThread* server_thread() { return server_thread_; }

Expand All @@ -97,11 +100,12 @@ class PikaClientConn : public net::RedisConn {
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_ = false;
std::queue<std::shared_ptr<Cmd>> txn_cmd_que_; // redis事务的队列
TxnState txn_state_{TxnState::None}; // 事务的状态
std::bitset<16> txn_state_; // class TxnStateBitMask
std::unordered_set<std::string> watched_db_keys_;
std::vector<std::string> txn_exec_dbs_;
std::mutex txn_mu_;
std::mutex txn_db_mu_;
std::mutex txn_state_mu_; // 用于锁事务状态
std::mutex txn_db_mu_; // 在执行事务的时候,采用加db锁的方式加锁,那么就会有多个db被加锁,那么就会有加锁顺序的问题,所以加了这把大锁
// 在void Cmd::ProcessExecCmd();中使用这把锁

std::shared_ptr<Cmd> DoCmd(const PikaCmdArgsType& argv, const std::string& opt,
const std::shared_ptr<std::string>& resp_ptr);
Expand Down
3 changes: 3 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,9 @@ class Cmd : public std::enable_shared_from_this<Cmd> {
bool is_multi_slot() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;
uint64_t GetDoDuration() const { return do_duration_; };
void SetDbName(const std::string& db_name) {
db_name_ = db_name;
}

std::string name() const;
CmdRes& res();
Expand Down
51 changes: 34 additions & 17 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
std::shared_ptr<Cmd> tmp_ptr = std::make_shared<DummyCmd>(DummyCmd());
tmp_ptr->res().SetRes(CmdRes::kErrOther, "unknown command \"" + opt + "\"");
if (IsInTxn()) {
SetTxnState(TxnState::InitCmdFailed); // 本次事务就算是失败了
SetTxnInitFailState(true); // 本次事务就算是失败了
}
return tmp_ptr;
}
Expand Down Expand Up @@ -70,7 +70,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
c_ptr->Initial(argv, current_db_);
if (!c_ptr->res().ok()) {
if (IsInTxn()) {
SetTxnState(TxnState::InitCmdFailed); // 本次事务就算是失败了
SetTxnInitFailState(true); // 本次事务就算是失败了
}
return c_ptr;
}
Expand Down Expand Up @@ -301,37 +301,46 @@ void PikaClientConn::PushCmdToQue(std::shared_ptr<Cmd> cmd) {
txn_exec_dbs_.emplace_back(cmd->db_name());
}


void PikaClientConn::SetTxnState(TxnState state) {
std::lock_guard<std::mutex> lg(txn_mu_);
txn_state_ = state;
}

bool PikaClientConn::IsInTxn() {
std::lock_guard<std::mutex> lg(txn_mu_);
return txn_state_ != TxnState::None;
std::lock_guard<std::mutex> lg(txn_state_mu_);
return txn_state_[TxnStateBitMask::Start];
}

bool PikaClientConn::IsTxnFailed() {
std::lock_guard<std::mutex> lg(txn_mu_);
return txn_state_ == TxnState::InitCmdFailed || txn_state_ == TxnState::WatchFailed;
std::lock_guard<std::mutex> lg(txn_state_mu_);
return txn_state_[TxnStateBitMask::WatchFailed] | txn_state_[TxnStateBitMask::InitCmdFailed];
}
bool PikaClientConn::IsTxnInitFailed() {
std::lock_guard<std::mutex> lg(txn_mu_);
return txn_state_ == TxnState::InitCmdFailed;
std::lock_guard<std::mutex> lg(txn_state_mu_);
return txn_state_[TxnStateBitMask::InitCmdFailed];
}

bool PikaClientConn::IsTxnWatchFailed() {
std::lock_guard<std::mutex> lg(txn_mu_);
return txn_state_ == TxnState::WatchFailed;
std::lock_guard<std::mutex> lg(txn_state_mu_);
return txn_state_[TxnStateBitMask::WatchFailed];
}
void PikaClientConn::SetTxnWatchFailState(bool is_failed) {
std::lock_guard<std::mutex> lg(txn_state_mu_);
txn_state_[TxnStateBitMask::WatchFailed] = is_failed;
}
void PikaClientConn::SetTxnInitFailState(bool is_failed) {
std::lock_guard<std::mutex> lg(txn_state_mu_);
txn_state_[TxnStateBitMask::InitCmdFailed] = is_failed;
}

void PikaClientConn::SetTxnStartState(bool is_start) {
std::lock_guard<std::mutex> lg(txn_state_mu_);
txn_state_[TxnStateBitMask::Start] = is_start;
}


std::vector<CmdRes> PikaClientConn::ExecTxnCmds() {
auto ret_res = std::vector<CmdRes>{};
while (!txn_cmd_que_.empty()) {
auto cmd = txn_cmd_que_.front();
txn_cmd_que_.pop();
cmd->res().SetRes(CmdRes::CmdRet::kNone);
cmd->SetDbName(current_db_);
cmd->ProcessSingleSlotCmd();
if (cmd->res().ok() && cmd->is_write()) {
auto db_keys = cmd->current_key();
Expand Down Expand Up @@ -391,11 +400,19 @@ void PikaClientConn::SetTxnFailedFromKeys(const std::vector<std::string> &table_
}
for (auto &conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr && c.get() != this) {
c->SetTxnState(TxnState::WatchFailed);
c->SetTxnWatchFailState(true);
}
}
}
}
void PikaClientConn::ExitTxn() {
if (IsInTxn()) {
RemoveWatchedKeys();
ClearTxnCmdQue();
std::lock_guard<std::mutex> lg(txn_state_mu_);
txn_state_.reset();
}
}


void PikaClientConn::ExecRedisCmd(const PikaCmdArgsType& argv, const std::shared_ptr<std::string>& resp_ptr) {
Expand Down
32 changes: 9 additions & 23 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void MultiCmd::Do(std::shared_ptr<Slot> partition) {
res_.SetRes(CmdRes::kErrOther, "ERR MULTI calls can not be nested");
return;
}
client_conn->SetTxnState(PikaClientConn::TxnState::Start);
client_conn->SetTxnStartState(true);
res_.SetRes(CmdRes::kOk);
}

Expand All @@ -39,23 +39,17 @@ void ExecCmd::Do(std::shared_ptr<Slot> slot) {
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "EXECABORT Transaction discarded because of previous errors.");
res_.SetRes(CmdRes::kErrOther, "ERR EXEC without MULTI");
return;
}

if (client_conn->IsTxnInitFailed()) {
res_.SetRes(CmdRes::kErrOther, "EXECABORT Transaction discarded because of previous errors.");
// client_conn->RemoveWatchedKeys();
// client_conn->SetTxnState(PikaClientConn::TxnState::None);
// client_conn->ClearTxnCmdQue();
res_.SetRes(CmdRes::kErrOther, "EXEC ABORT Transaction discarded because of previous errors.");
client_conn->ExitTxn();
return;
}
if (client_conn->IsTxnWatchFailed()) {
res_.AppendStringLen(-1);
// client_conn->RemoveWatchedKeys();
// client_conn->SetTxnState(PikaClientConn::TxnState::None);
// client_conn->ClearTxnCmdQue();
client_conn->ExitTxn();
return;
}
Expand All @@ -70,12 +64,10 @@ void ExecCmd::Do(std::shared_ptr<Slot> slot) {
for (auto & cmd_re : cmd_res) {
res_.AppendStringRaw(cmd_re.message());
}
// client_conn->RemoveWatchedKeys();
// client_conn->SetTxnState(PikaClientConn::TxnState::None);
client_conn->ExitTxn();
}

//! 在这里还没法得到涉及到的key,因为
//! 在这里还没法得到涉及到的key,因为客户端连接对象中有一些的key他们的db不一样
void ExecCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, name());
Expand Down Expand Up @@ -104,7 +96,7 @@ void WatchCmd::Do(std::shared_ptr<Slot> slot) {
slot->db()->Exists(keys_, &mp);
if (mp.size() > 1) {
// 说明一个key里面有多种类型
res_.SetRes(CmdRes::CmdRet::kErrOther, "watch key must be unique");
res_.SetRes(CmdRes::CmdRet::kErrOther, "EXEC WATCH watch key must be unique");
return;
}

Expand Down Expand Up @@ -137,9 +129,6 @@ void WatchCmd::DoInitial() {
}

//NOTE(leeHao): 在redis中,如果unwatch出现在队列之中,其实不会生效
//TODO(leeHao): 得给TxnState变成位运算的操作,因为错误可以有两个,init fail或者是watch fail,并存,
//并且,unwatch的时候,如果当前不在exec当中,所以,TxnState还得加一个,exec当中,
// 如果不在exec当中,执行的unwatch,那么之前如果被设置成为了watch fail,现在得将其的这个watch fail标志给删除。
void UnwatchCmd::Do(std::shared_ptr<Slot> slot) {
auto conn = GetConn();
auto client_conn = std::dynamic_pointer_cast<PikaClientConn>(conn);
Expand All @@ -155,9 +144,9 @@ void UnwatchCmd::Do(std::shared_ptr<Slot> slot) {
return;
}
client_conn->RemoveWatchedKeys();
// if (client_conn->IsTxnInitFailed()) {
// client_conn->SetTxnState(PikaClientConn::TxnState::);
// }
if (client_conn->IsTxnWatchFailed()) {
client_conn->SetTxnWatchFailState(false);
}
res_.SetRes(CmdRes::CmdRet::kOk);
}

Expand All @@ -183,12 +172,9 @@ void DiscardCmd::Do(std::shared_ptr<Slot> partition) {
return;
}
if (!client_conn->IsInTxn()) {
res_.SetRes(CmdRes::kErrOther, "DISCARD without MULTI");
res_.SetRes(CmdRes::kErrOther, "ERR DISCARD without MULTI");
return;
}
// client_conn->RemoveWatchedKeys();
// client_conn->SetTxnState(PikaClientConn::TxnState::None);
// client_conn->ClearTxnCmdQue();
client_conn->ExitTxn();
res_.SetRes(CmdRes::CmdRet::kOk);
}

0 comments on commit a24b9ec

Please sign in to comment.