Skip to content

Commit

Permalink
test: fix multi bug and compatible ACL test (#2815)
Browse files Browse the repository at this point in the history
* add multi.tcl

* Commented test cases in Tcl that cannot pass

* fix multi bug and compatible ACL test

---------

Co-authored-by: saz97 <[email protected]>
Co-authored-by: liuyuecai <[email protected]>
  • Loading branch information
3 people authored Jul 30, 2024
1 parent bd1d7af commit f2d8e9c
Show file tree
Hide file tree
Showing 8 changed files with 979 additions and 215 deletions.
3 changes: 1 addition & 2 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ class PikaClientConn : public net::RedisConn {
void AddKeysToWatch(const std::vector<std::string>& db_keys);
void RemoveWatchedKeys();
void SetTxnFailedFromKeys(const std::vector<std::string>& db_keys);
void SetAllTxnFailed();
void SetTxnFailedFromDBs(std::string db_name);
void SetTxnFailedIfKeyExists(const std::string target_db_name = "");
void ExitTxn();
bool IsInTxn();
bool IsTxnInitFailed();
Expand Down
2 changes: 1 addition & 1 deletion pikatests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function setup_pika_bin {
exit 1
fi
cp $PIKA_BIN src/redis-server
cp conf/pika.conf tests/assets/default.conf
cp tests/conf/pika.conf tests/assets/default.conf
}


Expand Down
64 changes: 37 additions & 27 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,28 +186,28 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
}
}

// Process Command
c_ptr->Execute();
time_stat_->process_done_ts_ = pstd::NowMicros();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (c_ptr->res().ok() && c_ptr->is_write() && name() != kCmdNameExec) {
if (c_ptr->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(c_ptr);
SetTxnFailedFromDBs(flushdb->GetFlushDBname());
SetTxnFailedIfKeyExists(flushdb->GetFlushDBname());
} else if (c_ptr->name() == kCmdNameFlushall) {
SetAllTxnFailed();
SetTxnFailedIfKeyExists();
} else {
auto table_keys = c_ptr->current_key();
for (auto& key : table_keys) {
key = c_ptr->db_name().append(key);
key = c_ptr->db_name().append("_").append(key);
}
SetTxnFailedFromKeys(table_keys);
}
}

// Process Command
c_ptr->Execute();
time_stat_->process_done_ts_ = pstd::NowMicros();
auto cmdstat_map = g_pika_cmd_table_manager->GetCommandStatMap();
(*cmdstat_map)[opt].cmd_count.fetch_add(1);
(*cmdstat_map)[opt].cmd_time_consuming.fetch_add(time_stat_->total_time());

if (g_pika_conf->slowlog_slower_than() >= 0) {
ProcessSlowlog(argv, c_ptr->GetDoDuration());
}
Expand Down Expand Up @@ -387,32 +387,42 @@ void PikaClientConn::SetTxnFailedFromKeys(const std::vector<std::string>& db_key
auto involved_conns = std::vector<std::shared_ptr<NetConn>>{};
involved_conns = dispatcher->GetInvolvedTxn(db_keys);
for (auto& conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr && c.get() != this) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr) {
c->SetTxnWatchFailState(true);
}
}
}
}

void PikaClientConn::SetAllTxnFailed() {
// if key in target_db exists, then the key been watched multi will be failed
void PikaClientConn::SetTxnFailedIfKeyExists(std::string target_db_name) {
auto dispatcher = dynamic_cast<net::DispatchThread*>(server_thread());
if (dispatcher != nullptr) {
auto involved_conns = dispatcher->GetAllTxns();
for (auto& conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr && c.get() != this) {
c->SetTxnWatchFailState(true);
}
}
if (dispatcher == nullptr) {
return;
}
}
auto involved_conns = dispatcher->GetAllTxns();
for (auto& conn : involved_conns) {
std::shared_ptr<PikaClientConn> c;
if (c = std::dynamic_pointer_cast<PikaClientConn>(conn); c == nullptr) {
continue;
}

void PikaClientConn::SetTxnFailedFromDBs(std::string db_name) {
auto dispatcher = dynamic_cast<net::DispatchThread*>(server_thread());
if (dispatcher != nullptr) {
auto involved_conns = dispatcher->GetDBTxns(db_name);
for (auto& conn : involved_conns) {
if (auto c = std::dynamic_pointer_cast<PikaClientConn>(conn); c != nullptr && c.get() != this) {
c->SetTxnWatchFailState(true);
for (const auto& db_key : c->watched_db_keys_) {
size_t pos = db_key.find('_');
if (pos == std::string::npos) {
continue;
}

auto db_name = db_key.substr(0, pos);
auto key = db_key.substr(pos + 1);

if (target_db_name == "" || target_db_name == "all" || target_db_name == db_name) {
auto db = g_pika_server->GetDB(db_name);
// if watched key exists, set watch state to failed
if (db->storage()->Exists({key}) > 0) {
c->SetTxnWatchFailState(true);
break;
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/pika_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ void ExecCmd::Do() {
if (cmd->name() == kCmdNameFlushall) {
auto flushall = std::dynamic_pointer_cast<FlushallCmd>(cmd);
flushall->FlushAllWithoutLock();
client_conn->SetAllTxnFailed();
client_conn->SetTxnFailedIfKeyExists();
} else if (cmd->name() == kCmdNameFlushdb) {
auto flushdb = std::dynamic_pointer_cast<FlushdbCmd>(cmd);
flushdb->DoWithoutLock();
if (cmd->res().ok()) {
cmd->res().SetRes(CmdRes::kOk);
}
client_conn->SetTxnFailedFromDBs(each_cmd_info.db_->GetDBName());
client_conn->SetTxnFailedIfKeyExists(each_cmd_info.db_->GetDBName());
} else {
cmd->Do();
if (cmd->res().ok() && cmd->is_write()) {
Expand Down Expand Up @@ -258,7 +258,7 @@ void WatchCmd::DoInitial() {
size_t pos = 1;
while (pos < argv_.size()) {
keys_.emplace_back(argv_[pos]);
db_keys_.push_back(db_name() + argv_[pos++]);
db_keys_.push_back(db_name() + "_" + argv_[pos++]);
}
}

Expand Down
27 changes: 23 additions & 4 deletions tests/assets/default.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ slow-cmd-thread-pool-size : 1
# Slow cmd list e.g. hgetall, mset
slow-cmd-list :

# The number of sync-thread for data replication from master, those are the threads work on slave nodes
# and are used to execute commands sent from master node when replicating.
# The number of threads to write DB in slaveNode when replicating.
# It's preferable to set slave's sync-thread-num value close to master's thread-pool-size.
sync-thread-num : 6

# The num of threads to write binlog in slaveNode when replicating,
# each DB cloud only bind to one sync-binlog-thread to write binlog in maximum
#[NOTICE] It's highly recommended to set sync-binlog-thread-num equal to conf item 'database'(then each DB cloud have a exclusive thread to write binlog),
# eg. if you use 8 DBs(databases_ is 8), sync-binlog-thread-num is preferable to be 8
# Valid range of sync-binlog-thread-num is [1, databases], the final value of it is Min(sync-binlog-thread-num, databases)
sync-binlog-thread-num : 1

# Directory to store log files of Pika, which contains multiple types of logs,
# Including: INFO, WARNING, ERROR log, as well as binglog(write2fine) file which
# is used for replication.
Expand Down Expand Up @@ -101,6 +108,8 @@ instance-mode : classic
# The default database id is DB 0. You can select a different one on
# a per-connection by using SELECT. The db id range is [0, 'databases' value -1].
# The value range of this parameter is [1, 8].
# [NOTICE] It's RECOMMENDED to set sync-binlog-thread-num equal to DB num(databases),
# if you've changed the value of databases, remember to check if the value of sync-binlog-thread-num is proper.
databases : 1

# The number of followers of a master. Only [0, 1, 2, 3, 4] is valid at present.
Expand Down Expand Up @@ -308,6 +317,11 @@ max-write-buffer-num : 2
# whether the key exists. Setting this value too high may hurt performance.
min-write-buffer-number-to-merge : 1

# The total size of wal files, when reaches this limit, rocksdb will force the flush of column-families
# whose memtables are backed by the oldest live WAL file. Also used to control the rocksdb open time when
# process restart.
max-total-wal-size : 1073741824

# rocksdb level0_stop_writes_trigger
level0-stop-writes-trigger : 36

Expand Down Expand Up @@ -466,9 +480,14 @@ default-slot-num : 1024
# The cache will be sharded into 2^blob-num-shard-bits shards.
# blob-num-shard-bits : -1

# Rsync Rate limiting configuration 200MB/s
# Rsync Rate limiting configuration [Default value is 200MB/s]
# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit.
# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes).
throttle-bytes-per-second : 207200000

# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small.
# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command
# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust.
rsync-timeout-ms : 1000
# The valid range for max-rsync-parallel-num is [1, 4].
# If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4.
max-rsync-parallel-num : 4
Expand Down
Loading

0 comments on commit f2d8e9c

Please sign in to comment.