Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support blpop/brpop #1548

Merged
merged 57 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
acb8ee9
working on BlpopCmd::Doinitial
cheniujh May 15, 2023
ab8caf4
working on : making the maps and new monitoring thread
cheniujh May 16, 2023
fc6f771
added bLRPop_blocking_info_ in pika_server.cc
cheniujh May 16, 2023
e2558d5
adding functions
cheniujh May 17, 2023
35273ee
adding functions02
cheniujh May 17, 2023
324fe89
temp save
cheniujh May 17, 2023
c67cdfe
Merge branch 'OpenAtomFoundation:unstable' into add_blpop_brpop
cheniujh May 18, 2023
6a47103
data structure done
cheniujh May 18, 2023
934c3bb
working
cheniujh May 20, 2023
fe79658
revised data structure
cheniujh May 20, 2023
2ab0ad4
working on moving
cheniujh May 20, 2023
1c9a278
working
cheniujh May 20, 2023
a1fd89f
Merge branch 'OpenAtomFoundation:unstable' into add_blpop_brpop
cheniujh May 21, 2023
abebaa3
next step is to add a timeout scan
cheniujh May 21, 2023
2f0d057
added TimerTaskManager
cheniujh May 21, 2023
5cf9461
modified TimerTaskManager(v2)
cheniujh May 21, 2023
c867955
scan added
cheniujh May 21, 2023
24cd349
tiny fix
cheniujh May 22, 2023
bbfc0d0
changed the task of unblocking conn from sync to async
cheniujh May 23, 2023
b3399e5
Merge branch 'back_up_add_blpop_brpop' of github.com:cheniujh/pika in…
cheniujh May 23, 2023
3f6b702
removed some code for testing
cheniujh May 23, 2023
5a271b3
pull from unstable and solve conflicts
cheniujh Jun 4, 2023
6ab355a
change from partition to db
cheniujh Jun 4, 2023
c8b9ad6
temp save for testing
cheniujh Jun 5, 2023
ac7a75d
handle binlog of blr/pop(when conn get served, write a binlog of lpop…
cheniujh Jun 6, 2023
8be5a32
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 6, 2023
3bb995a
improved sanitizer options in CMakeLists
cheniujh Jun 6, 2023
e2dbb57
temp save
cheniujh Jun 8, 2023
33de9fc
add unit test
cheniujh Jun 8, 2023
ae9b951
modified cmakelists
cheniujh Jun 8, 2023
9693eab
temp save
cheniujh Jun 10, 2023
5742588
improved code based on reviwer's opinion
cheniujh Jun 10, 2023
c0e0155
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 13, 2023
9581675
renamed some variables
cheniujh Jun 13, 2023
8362eac
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 14, 2023
3f3376d
Merge remote-tracking branch 'origin/new_add_blpop_brpop' into new_ad…
cheniujh Jun 14, 2023
819576f
1. added record lock in ServeAndUnblockConns(void* args)
cheniujh Jun 14, 2023
a511054
added multi-db concurrency test
cheniujh Jun 14, 2023
590faf4
renamed a flag from kCmdFlagsMayDfferWrite to kCmdFlagsMayDfferWriteB…
cheniujh Jun 14, 2023
a440420
1. added record lock in ServeAndUnblockConns(void* args)
cheniujh Jun 14, 2023
c34e997
Merge remote-tracking branch 'origin/new_add_blpop_brpop' into new_ad…
cheniujh Jun 16, 2023
b600811
removed unsed code
cheniujh Jun 16, 2023
6163b42
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 19, 2023
3cfa0dd
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jun 19, 2023
96f72f7
revised code based on opinion of reviewer
cheniujh Jun 19, 2023
5ddb1d5
Merge branch 'unstable_new' of github.com:cheniujh/pika into new_add_…
cheniujh Jun 20, 2023
b9e55f8
Merge branch 'unstable' of github.com:cheniujh/pika into new_add_blpo…
cheniujh Jul 5, 2023
4dbe71f
add the unit test to github workflow
cheniujh Jul 5, 2023
0ba21ee
revised unit test file
cheniujh Jul 5, 2023
24d123e
removed time counting
cheniujh Jul 6, 2023
f324669
clear github action cache
cheniujh Jul 13, 2023
5e6ad36
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jul 13, 2023
8619909
revised based on reviwer's opinions
cheniujh Jul 13, 2023
70d19f9
Merge branch 'OpenAtomFoundation:unstable' into new_add_blpop_brpop
cheniujh Jul 18, 2023
5d061da
Modify to adapt to Mac.
cheniujh Jul 18, 2023
100d07a
Merge remote-tracking branch 'origin/new_add_blpop_brpop' into new_ad…
cheniujh Jul 18, 2023
c409cd0
Modify to adapt to Mac 2.
cheniujh Jul 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class PikaClientConn : public net::RedisConn {
bool IsPubSub() { return is_pubsub_; }
void SetIsPubSub(bool is_pubsub) { is_pubsub_ = is_pubsub; }
void SetCurrentTable(const std::string& table_name) { current_table_ = table_name; }
const std::string& GetCurrentTable() override{ return current_table_; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = cb; }

net::ServerThread* server_thread() { return server_thread_; }
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
2 changes: 2 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ const std::string kCmdNamePKHRScanRange = "pkhrscanrange";
const std::string kCmdNameLIndex = "lindex";
const std::string kCmdNameLInsert = "linsert";
const std::string kCmdNameLLen = "llen";
const std::string kCmdNameBLPop = "blpop";
const std::string kCmdNameLPop = "lpop";
const std::string kCmdNameLPush = "lpush";
const std::string kCmdNameLPushx = "lpushx";
const std::string kCmdNameLRange = "lrange";
const std::string kCmdNameLRem = "lrem";
const std::string kCmdNameLSet = "lset";
const std::string kCmdNameLTrim = "ltrim";
const std::string kCmdNameBrpop = "brpop";
const std::string kCmdNameRPop = "rpop";
const std::string kCmdNameRPopLPush = "rpoplpush";
const std::string kCmdNameRPush = "rpush";
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
57 changes: 51 additions & 6 deletions include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "include/pika_command.h"
#include "include/pika_partition.h"
#include "net/src/dispatch_thread.h"

/*
* list
Expand Down Expand Up @@ -73,6 +74,29 @@ class LLenCmd : public Cmd {
virtual void DoInitial() override;
};

class BLRPopBaseCmd : public Cmd {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
public:
BLRPopBaseCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void BlockThisClientToWaitLRPush(net::BlockPopType block_pop_type);
protected:
void DoInitial() override;
std::vector<std::string> keys_;
int64_t expire_time_{0};
};

class BLPopCmd : public BLRPopBaseCmd {
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
public:
BLPopCmd(const std::string& name, int arity, uint16_t flag) : BLRPopBaseCmd(name, arity, flag){};
virtual std::vector<std::string> current_key() const {
std::vector<std::string> res = keys_;
return res;
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys){};
virtual void Merge(){};
virtual Cmd* Clone() override { return new BLPopCmd(*this); }
};

class LPopCmd : public Cmd {
public:
LPopCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
Expand All @@ -91,9 +115,16 @@ class LPopCmd : public Cmd {
virtual void DoInitial() override;
};

class LPushCmd : public Cmd {
class BPopServeCmd : public Cmd {
public:
LPushCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
BPopServeCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void TryToServeBLrPopWithThisKey(const std::string& key, std::shared_ptr<Partition> partition);
static void ServeAndUnblockConns(void* args);
};

class LPushCmd : public BPopServeCmd {
public:
LPushCmd(const std::string& name, int arity, uint16_t flag) : BPopServeCmd(name, arity, flag){};
virtual std::vector<std::string> current_key() const {
std::vector<std::string> res;
res.push_back(key_);
Expand Down Expand Up @@ -210,6 +241,20 @@ class LTrimCmd : public Cmd {
virtual void DoInitial() override;
};

class BRPopCmd : public BLRPopBaseCmd {
public:
BRPopCmd(const std::string& name, int arity, uint16_t flag) : BLRPopBaseCmd(name, arity, flag){};
virtual std::vector<std::string> current_key() const {
std::vector<std::string> res = keys_;
return res;
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys){};
virtual void Merge(){};
virtual Cmd* Clone() override { return new BRPopCmd(*this); }

};

class RPopCmd : public Cmd {
public:
RPopCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
Expand All @@ -228,9 +273,9 @@ class RPopCmd : public Cmd {
virtual void DoInitial() override;
};

class RPopLPushCmd : public Cmd {
class RPopLPushCmd : public BPopServeCmd {
public:
RPopLPushCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
RPopLPushCmd(const std::string& name, int arity, uint16_t flag) : BPopServeCmd(name, arity, flag){};
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(source_);
Expand All @@ -247,9 +292,9 @@ class RPopLPushCmd : public Cmd {
virtual void DoInitial() override;
};

class RPushCmd : public Cmd {
class RPushCmd : public BPopServeCmd {
public:
RPushCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag){};
RPushCmd(const std::string& name, int arity, uint16_t flag) : BPopServeCmd(name, arity, flag){};
virtual std::vector<std::string> current_key() const {
std::vector<std::string> res;
res.push_back(key_);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
12 changes: 8 additions & 4 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ enum TaskType {
kBgSave,
};


class PikaServer {
public:
PikaServer();
Expand Down Expand Up @@ -324,6 +325,8 @@ class PikaServer {
storage::Status RewriteStorageOptions(const storage::OptionType& option_type,
const std::unordered_map<std::string, std::string>& options);



friend class Cmd;
friend class InfoCmd;
friend class PkClusterAddSlotsCmd;
Expand Down Expand Up @@ -360,6 +363,7 @@ class PikaServer {
std::shared_mutex tables_rw_;
std::map<std::string, std::shared_ptr<Table>> tables_;


/*
* CronTask used
*/
Expand All @@ -377,15 +381,15 @@ class PikaServer {
* Slave used
*/
std::string master_ip_;
int master_port_ = 0;
int repl_state_ = PIKA_REPL_NO_CONNECT;
int master_port_ = 0;
int repl_state_ = PIKA_REPL_NO_CONNECT;
int role_ = PIKA_ROLE_SINGLE;
int last_meta_sync_timestamp_ = 0;
bool first_meta_sync_ = false;
bool loop_partition_state_machine_ = false;
bool force_full_sync_ = false;
bool leader_protected_mode_ = false; // reject request after master slave sync done
std::shared_mutex state_protector_; // protect below, use for master-slave mode
bool leader_protected_mode_ = false; // reject request after master slave sync done
std::shared_mutex state_protector_; // protect below, use for master-slave mode

/*
* Bgsave used
Expand Down
2 changes: 1 addition & 1 deletion src/net/include/redis_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RedisConn : public NetConn {
void NotifyEpoll(bool success);

virtual int DealMessage(const RedisCmdArgsType& argv, std::string* response) = 0;

virtual const std::string& GetCurrentTable() = 0;
private:
static int ParserDealMessageCb(RedisParser* parser, const RedisCmdArgsType& argv);
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
static int ParserCompleteCb(RedisParser* parser, const std::vector<RedisCmdArgsType>& argvs);
Expand Down
9 changes: 6 additions & 3 deletions src/net/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "net/include/net_define.h"
#include "net/include/net_thread.h"
#include "net/src/net_multiplexer.h"
#include "pstd/include/env.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"

Expand Down Expand Up @@ -111,9 +112,9 @@ const int kDefaultKeepAliveTime = 60; // (s)

class ServerThread : public Thread {
public:
ServerThread(int port, int cron_interval, const ServerHandle* handle);
ServerThread(const std::string& bind_ip, int port, int cron_interval, const ServerHandle* handle);
ServerThread(const std::set<std::string>& bind_ips, int port, int cron_interval, const ServerHandle* handle);
ServerThread(int port, int cron_interval, const ServerHandle* handle, ServerThread* dispatcher = nullptr);
ServerThread(const std::string& bind_ip, int port, int cron_interval, const ServerHandle* handle, ServerThread* dispatcher = nullptr);
ServerThread(const std::set<std::string>& bind_ips, int port, int cron_interval, const ServerHandle* handle, ServerThread* dispatcher = nullptr);

#ifdef __ENABLE_SSL
/*
Expand Down Expand Up @@ -175,6 +176,7 @@ class ServerThread : public Thread {
// process events in notify_queue
virtual void ProcessNotifyEvents(const NetFiredEvent* pfe);

ServerThread* dispatcher_;
const ServerHandle* handle_;
bool own_handle_ = false;

cheniujh marked this conversation as resolved.
Show resolved Hide resolved
cheniujh marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -197,6 +199,7 @@ class ServerThread : public Thread {
* The server event handle
*/
virtual void HandleConnEvent(NetFiredEvent* pfe) = 0;

};

// !!!Attention: If u use this constructor, the keepalive_timeout_ will
Expand Down
18 changes: 9 additions & 9 deletions src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,37 @@ namespace net {

DispatchThread::DispatchThread(int port, int work_num, ConnFactory* conn_factory, int cron_interval, int queue_limit,
const ServerHandle* handle)
: ServerThread::ServerThread(port, cron_interval, handle),
: ServerThread::ServerThread(port, cron_interval, handle, this),
last_thread_(0),
work_num_(work_num),
queue_limit_(queue_limit) {
queue_limit_(queue_limit),
timedTaskManager(net_multiplexer_->GetMultiplexer()) {
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
}
}

DispatchThread::DispatchThread(const std::string& ip, int port, int work_num, ConnFactory* conn_factory,
int cron_interval, int queue_limit, const ServerHandle* handle)
: ServerThread::ServerThread(ip, port, cron_interval, handle),
: ServerThread::ServerThread(ip, port, cron_interval, handle, this),
last_thread_(0),
work_num_(work_num),
queue_limit_(queue_limit) {

queue_limit_(queue_limit),
timedTaskManager(net_multiplexer_->GetMultiplexer()){
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));
}
}

DispatchThread::DispatchThread(const std::set<std::string>& ips, int port, int work_num, ConnFactory* conn_factory,
int cron_interval, int queue_limit, const ServerHandle* handle)
: ServerThread::ServerThread(ips, port, cron_interval, handle),
: ServerThread::ServerThread(ips, port, cron_interval, handle, this),
last_thread_(0),
work_num_(work_num),
queue_limit_(queue_limit) {

queue_limit_(queue_limit),
timedTaskManager(net_multiplexer_->GetMultiplexer()){
for (int i = 0; i < work_num_; i++) {
worker_thread_.emplace_back(std::make_unique<WorkerThread>(conn_factory, this, queue_limit, cron_interval));

}
}

Expand Down
Loading