Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: fix multi bug and compatible ACL test #2815

Merged
merged 8 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ class PikaClientConn : public net::RedisConn {
void AddKeysToWatch(const std::vector<std::string>& db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string>& db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void SetTxnFailedIfKeyExists(const std::string target_db_name = "");
void ExitTxn();
bool IsInTxn();
bool IsTxnInitFailed();
Expand Down
2 changes: 1 addition & 1 deletion pikatests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function setup_pika_bin {
exit 1
fi
cp $PIKA_BIN src/redis-server
cp conf/pika.conf tests/assets/default.conf
cp tests/conf/pika.conf tests/assets/default.conf
}


Expand Down
65 changes: 38 additions & 27 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,28 +186,29 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
}
}

// Process Command
c_ptr->Execute();
time_stat_->process_done_ts_ = pstd::NowMicros();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (c_ptr->res().ok() && c_ptr->is_write() && name() != kCmdNameExec) {
LOG(INFO) << c_ptr->name();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

日志删一下

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (c_ptr->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(c_ptr);
SetTxnFailedFromDBs(flushdb->GetFlushDBname());
SetTxnFailedIfKeyExists(flushdb->GetFlushDBname());
} else if (c_ptr->name() == kCmdNameFlushall) {
SetAllTxnFailed();
SetTxnFailedIfKeyExists();
} else {
auto table_keys = c_ptr->current_key();
for (auto& key : table_keys) {
key = c_ptr->db_name().append(key);
key = c_ptr->db_name().append("_").append(key);
}
SetTxnFailedFromKeys(table_keys);
}
}

// Process Command
c_ptr->Execute();
time_stat_->process_done_ts_ = pstd::NowMicros();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (g_pika_conf->slowlog_slower_than() >= 0) {
ProcessSlowlog(argv, c_ptr->GetDoDuration());
}
Expand Down Expand Up @@ -386,32 +387,42 @@ void PikaClientConn::SetTxnFailedFromKeys(const std::vector<std::string>& db_key
auto involved_conns = std::vector<std::shared_ptr<NetConn>>{};
involved_conns = dispatcher->GetInvolvedTxn(db_keys);
for (auto& conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr && c.get() != this) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为啥去掉c.get() != this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里为啥去掉c.get() != this?

如果一个client watch 了一个 key,就算这个 key 是被这个 client 自己修改的,这个 watch 也会失效。redis 是这个逻辑,所以去除了和这个判断

c->SetTxnWatchFailState(true);
}
}
}
}

void PikaClientConn::SetAllTxnFailed() {
// if key in target_db exists, then the key been watched multi will be failed
void PikaClientConn::SetTxnFailedIfKeyExists(std::string target_db_name) {
auto dispatcher = dynamic_cast<net::DispatchThread*>(server_thread());
if (dispatcher != nullptr) {
auto involved_conns = dispatcher->GetAllTxns();
for (auto& conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr && c.get() != this) {
c->SetTxnWatchFailState(true);
}
}
if (dispatcher == nullptr) {
return;
}
}
auto involved_conns = dispatcher->GetAllTxns();
for (auto& conn : involved_conns) {
std::shared_ptr<PikaClientConn> c;
if (c = std::dynamic_pointer_cast<PikaClientConn>(conn); c == nullptr) {
continue;
}

void PikaClientConn::SetTxnFailedFromDBs(std::string db_name) {
auto dispatcher = dynamic_cast<net::DispatchThread*>(server_thread());
if (dispatcher != nullptr) {
auto involved_conns = dispatcher->GetDBTxns(db_name);
for (auto& conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr && c.get() != this) {
c->SetTxnWatchFailState(true);
for (const auto& db_key : c->watched_db_keys_) {
size_t pos = db_key.find('_');
if (pos == std::string::npos) {
continue;
}

auto db_name = db_key.substr(0, pos);
auto key = db_key.substr(pos + 1);

if (target_db_name == "" || target_db_name == "all" || target_db_name == db_name) {
auto db = g_pika_server->GetDB(db_name);
// if watched key exists, set watch state to failed
if (db->storage()->Exists({key}) > 0) {
c->SetTxnWatchFailState(true);
break;
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ void ExecCmd::Do() {
if (cmd->name() == kCmdNameFlushall) {
auto flushall = std::dynamic_pointer_cast<FlushallCmd>(cmd);
flushall->FlushAllWithoutLock();
client_conn->SetAllTxnFailed();
client_conn->SetTxnFailedIfKeyExists();
} else if (cmd->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(cmd);
flushdb->FlushAllDBsWithoutLock();
if (cmd->res().ok()) {
cmd->res().SetRes(CmdRes::kOk);
}
client_conn->SetTxnFailedFromDBs(each_cmd_info.db_->GetDBName());
client_conn->SetTxnFailedIfKeyExists(each_cmd_info.db_->GetDBName());
} else {
cmd->Do();
if (cmd->res().ok() && cmd->is_write()) {
Expand Down Expand Up @@ -258,7 +258,7 @@ void WatchCmd::DoInitial() {
size_t pos = 1;
while (pos < argv_.size()) {
keys_.emplace_back(argv_[pos]);
db_keys_.push_back(db_name() + argv_[pos++]);
db_keys_.push_back(db_name() + "_" +argv_[pos++]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码格式 + argv_[pos++]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

代码格式 + argv_[pos++]

这个下划线是特意加上去的,作为 db_key 字符串内容的一部分

}
}

Expand Down
27 changes: 23 additions & 4 deletions tests/assets/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ slow-cmd-thread-pool-size : 1
# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# The number of sync-thread for data replication from master, those are the threads work on slave nodes
# and are used to execute commands sent from master node when replicating.
# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6

# The num of threads to write binlog in slaveNode when replicating,
# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum
#[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog),
# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8
# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases)
sync-binlog-thread-num : 1

# Directory to store log files of Pika, which contains multiple types of logs,
# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which
# is used for replication.
Expand Down Expand Up @@ -101,6 +108,8 @@ instance-mode : classic
# The default database id is DB 0. You can select a different one on
# a per-connection by using SELECT. The db id range is [0, 'databases' value -1].
# The value range of this parameter is [1, 8].
# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases),
# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper.
databases : 1

# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present.
Expand Down Expand Up @@ -308,6 +317,11 @@ max-write-buffer-num : 2
# whether the key exists. Setting this value too high may hurt performance.
min-write-buffer-number-to-merge : 1

# The total size of wal files, when reaches this limit, rocksdb will force the flush of column-families
# whose memtables are backed by the oldest live WAL file. Also used to control the rocksdb open time when
# process restart.
max-total-wal-size : 1073741824

# rocksdb level0_stop_writes_trigger
level0-stop-writes-trigger : 36

Expand Down Expand Up @@ -466,9 +480,14 @@ default-slot-num : 1024
# The cache will be sharded into 2^blob-num-shard-bits shards.
# blob-num-shard-bits : -1

# Rsync Rate limiting configuration 200MB/s
# Rsync Rate limiting configuration [Default value is 200MB/s]
# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit.
# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes).
throttle-bytes-per-second : 207200000

# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small.
# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command
# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust.
rsync-timeout-ms : 1000
# The valid range for max-rsync-parallel-num is [1, 4].
# If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4.
max-rsync-parallel-num : 4
Expand Down
Loading
Loading