Skip to content

Commit

Permalink
fix: change storage ttl time from seconds to milliseconds (OpenAtomFo…
Browse files Browse the repository at this point in the history
…undation#2857)

* fix: change storage ttl time from seconds to milliseconds (OpenAtomFoundation#2822)

* change storage ttl time from seconds to milliseconds

---------

Co-authored-by: liuyuecai <[email protected]>

test space

* fix incr cmd time to millionsseconds

* rename SetRelativeTimeByMillsec to SetRelativeTimeInMillsec

---------

Co-authored-by: liuyuecai <[email protected]>
  • Loading branch information
luky116 and liuyuecai committed Sep 3, 2024
1 parent 20d8a43 commit f41be92
Show file tree
Hide file tree
Showing 38 changed files with 660 additions and 565 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
- name: Start codis, pika master and pika slave
working-directory: ${{ github.workspace }}/build
run: |
echo "hello"
chmod +x ../tests/integration/start_master_and_slave.sh
../tests/integration/start_master_and_slave.sh
chmod +x ../tests/integration/start_codis.sh
Expand Down
2 changes: 1 addition & 1 deletion include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<

rocksdb::Status Del(const std::vector<std::string>& keys);
rocksdb::Status Expire(std::string& key, int64_t ttl);
rocksdb::Status Expireat(std::string& key, int64_t ttl);
rocksdb::Status Expireat(std::string& key, int64_t ttl_sec);
rocksdb::Status TTL(std::string& key, int64_t* ttl);
rocksdb::Status Persist(std::string& key);
rocksdb::Status Type(std::string& key, std::string* value);
Expand Down
33 changes: 16 additions & 17 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ class SetCmd : public Cmd {
std::string value_;
std::string target_;
int32_t success_ = 0;
int64_t sec_ = 0;
int64_t ttl_millsec = 0;
bool has_ttl_ = false;
SetCmd::SetCondition condition_{kNONE};
void DoInitial() override;
void Clear() override {
sec_ = 0;
ttl_millsec = 0;
success_ = 0;
condition_ = kNONE;
}
Expand Down Expand Up @@ -69,7 +69,7 @@ class GetCmd : public Cmd {
private:
std::string key_;
std::string value_;
int64_t sec_ = 0;
int64_t ttl_millsec_ = 0;
void DoInitial() override;
rocksdb::Status s_;
};
Expand Down Expand Up @@ -115,7 +115,7 @@ class IncrCmd : public Cmd {
int64_t new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
int64_t expired_timestamp_millsec_ = 0;
std::string ToRedisProtocol() override;
};

Expand All @@ -140,7 +140,7 @@ class IncrbyCmd : public Cmd {
int64_t by_ = 0, new_value_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
int64_t expired_timestamp_millsec_ = 0;
std::string ToRedisProtocol() override;
};

Expand All @@ -165,7 +165,7 @@ class IncrbyfloatCmd : public Cmd {
double by_ = 0;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
int64_t expired_timestamp_millsec_ = 0;
std::string ToRedisProtocol() override;
};

Expand Down Expand Up @@ -260,7 +260,7 @@ class AppendCmd : public Cmd {
std::string new_value_;
void DoInitial() override;
rocksdb::Status s_;
int64_t expired_timestamp_sec_ = 0;
int64_t expired_timestamp_millsec_ = 0;
std::string ToRedisProtocol() override;
};

Expand Down Expand Up @@ -351,7 +351,7 @@ class SetexCmd : public Cmd {

private:
std::string key_;
int64_t sec_ = 0;
int64_t ttl_sec_ = 0;
std::string value_;
void DoInitial() override;
rocksdb::Status s_;
Expand All @@ -376,7 +376,7 @@ class PsetexCmd : public Cmd {

private:
std::string key_;
int64_t usec_ = 0;
int64_t ttl_millsec = 0;
std::string value_;
void DoInitial() override;
rocksdb::Status s_;
Expand Down Expand Up @@ -540,7 +540,7 @@ class StrlenCmd : public Cmd {
private:
std::string key_;
std::string value_;
int64_t sec_ = 0;
int64_t ttl_millsec = 0;
void DoInitial() override;
rocksdb::Status s_;
};
Expand Down Expand Up @@ -581,7 +581,7 @@ class ExpireCmd : public Cmd {

private:
std::string key_;
int64_t sec_ = 0;
int64_t ttl_sec_ = 0;
void DoInitial() override;
std::string ToRedisProtocol() override;
rocksdb::Status s_;
Expand All @@ -605,7 +605,7 @@ class PexpireCmd : public Cmd {

private:
std::string key_;
int64_t msec_ = 0;
int64_t ttl_millsec = 0;
void DoInitial() override;
std::string ToRedisProtocol() override;
rocksdb::Status s_;
Expand All @@ -629,7 +629,7 @@ class ExpireatCmd : public Cmd {

private:
std::string key_;
int64_t time_stamp_ = 0;
int64_t time_stamp_sec_ = 0;
void DoInitial() override;
rocksdb::Status s_;
};
Expand All @@ -652,10 +652,9 @@ class PexpireatCmd : public Cmd {

private:
std::string key_;
int64_t time_stamp_ms_ = 0;
int64_t time_stamp_millsec_ = 0;
void DoInitial() override;
rocksdb::Status s_;
std::string ToRedisProtocol() override;
};

class TtlCmd : public Cmd {
Expand Down Expand Up @@ -810,9 +809,9 @@ class PKSetexAtCmd : public Cmd {
private:
std::string key_;
std::string value_;
int64_t time_stamp_ = 0;
int64_t time_stamp_sec_ = 0;
void DoInitial() override;
void Clear() override { time_stamp_ = 0; }
void Clear() override { time_stamp_sec_ = 0; }
rocksdb::Status s_;
};

Expand Down
2 changes: 1 addition & 1 deletion src/net/examples/performance/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ int main(int argc, char* argv[]) {

std::unique_ptr<ServerThread> st_thread(NewDispatchThread(ip, port, 24, &conn_factory, 1000));
st_thread->StartThread();
uint64_t st, ed;
pstd::TimeType st, ed;

while (!should_stop) {
st = NowMicros();
Expand Down
8 changes: 4 additions & 4 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2841,8 +2841,8 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"The rsync rate limit now is "
<< new_throughput_limit << "(Which Is Around " << (new_throughput_limit >> 20) << " MB/s)";
res_.AppendStringRaw("+OK\r\n");
} else if(set_item == "rsync-timeout-ms"){
if(pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0){
} else if (set_item == "rsync-timeout-ms") {
if (pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rsync-timeout-ms'\r\n");
return;
}
Expand Down Expand Up @@ -3037,9 +3037,9 @@ void DbsizeCmd::Do() {
if (!dbs) {
res_.SetRes(CmdRes::kInvalidDB);
} else {
if (g_pika_conf->slotmigrate()){
if (g_pika_conf->slotmigrate()) {
int64_t dbsize = 0;
for (int i = 0; i < g_pika_conf->default_slot_num(); ++i){
for (int i = 0; i < g_pika_conf->default_slot_num(); ++i) {
int32_t card = 0;
rocksdb::Status s = dbs->storage()->SCard(SlotKeyPrefix+std::to_string(i), &card);
if (s.ok() && card >= 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void BitGetCmd::DoThroughDB() {
Do();
}

void BitGetCmd::DoUpdateCache(){
void BitGetCmd::DoUpdateCache() {
if (s_.ok()) {
db_->cache()->PushKeyToAsyncLoadQueue(PIKA_KEY_TYPE_KV, key_, db_);
}
Expand Down
4 changes: 2 additions & 2 deletions src/pika_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ Status PikaCache::Expire(std::string& key, int64_t ttl) {
return caches_[cache_index]->Expire(key, ttl);
}

Status PikaCache::Expireat(std::string& key, int64_t ttl) {
Status PikaCache::Expireat(std::string& key, int64_t ttl_sec) {
int cache_index = CacheIndex(key);
std::lock_guard lm(*cache_mutexs_[cache_index]);
return caches_[cache_index]->Expireat(key, ttl);
return caches_[cache_index]->Expireat(key, ttl_sec);
}

Status PikaCache::TTL(std::string& key, int64_t *ttl) {
Expand Down
6 changes: 3 additions & 3 deletions src/pika_cache_load_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ bool PikaCacheLoadThread::LoadSet(std::string& key, const std::shared_ptr<DB>& d
}

std::vector<std::string> values;
int64_t ttl = -1;
rocksdb::Status s = db->storage()->SMembersWithTTL(key, &values, &ttl);
int64_t ttl_millsec = -1;
rocksdb::Status s = db->storage()->SMembersWithTTL(key, &values, &ttl_millsec);
if (!s.ok()) {
LOG(WARNING) << "load set failed, key=" << key;
return false;
}
db->cache()->WriteSetToCache(key, values, ttl);
db->cache()->WriteSetToCache(key, values, ttl_millsec > 0 ? ttl_millsec / 1000 : ttl_millsec);
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ int PikaConf::Load() {

std::string admin_cmd_list;
GetConfStr("admin-cmd-list", &admin_cmd_list);
if (admin_cmd_list == ""){
if (admin_cmd_list == "") {
admin_cmd_list = "info, monitor, ping";
SetAdminCmd(admin_cmd_list);
}
Expand Down Expand Up @@ -724,7 +724,7 @@ int PikaConf::Load() {

int64_t tmp_rsync_timeout_ms = -1;
GetConfInt64("rsync-timeout-ms", &tmp_rsync_timeout_ms);
if(tmp_rsync_timeout_ms <= 0){
if (tmp_rsync_timeout_ms <= 0) {
rsync_timeout_ms_.store(1000);
} else {
rsync_timeout_ms_.store(tmp_rsync_timeout_ms);
Expand Down
2 changes: 1 addition & 1 deletion src/pika_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ static void GetAllNeighbors(const std::shared_ptr<DB>& db, std::string& key, Geo
int32_t count = 0;
int32_t card = db->storage()->Exists({range.storekey});
if (card) {
if (db->storage()->Del({range.storekey}) > 0){
if (db->storage()->Del({range.storekey}) > 0) {
db->cache()->Del({range.storekey});
}
}
Expand Down
Loading

0 comments on commit f41be92

Please sign in to comment.