Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change the dir name: src/pink -> src/net #1350

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ set(ROCKSDB_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(ROCKSDB_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/${EP_BASE_SUFFIX}/Source/rocksdb)

add_subdirectory(src/pstd)
add_subdirectory(src/pink)
add_subdirectory(src/net)
add_subdirectory(src/blackwidow)
aux_source_directory(src DIR_SRCS)

Expand Down Expand Up @@ -551,7 +551,7 @@ add_dependencies(${PROJECT_NAME}
rocksdb
protobuf
pstd
pink
net
blackwidow)

target_include_directories(${PROJECT_NAME}
Expand All @@ -562,7 +562,7 @@ target_include_directories(${PROJECT_NAME}

target_link_libraries(${PROJECT_NAME}
blackwidow
pink
net
pstd
${LIB_GLOG}
librocksdb.a
Expand Down
32 changes: 16 additions & 16 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CLEAN_FILES = # deliberately empty, so we can append below.
CXX=g++
PLATFORM_LDFLAGS= -lpthread -lrt
PLATFORM_CXXFLAGS= -std=c++11 -fno-builtin-memcmp -msse -msse4.2
PLATFORM_CXXFLAGS= -std=c++11 -fno-builtin-memcmp -msse -msse4.2
ROCKSDB_CXXFLAGS=
CC_VERSION_MAJOR := $(shell $(CXX) -dumpversion | cut -d '.' -f1)
ifeq (1,$(shell expr $(CC_VERSION_MAJOR) \> 7))
Expand Down Expand Up @@ -63,14 +63,14 @@ SRC_PATH = $(CURDIR)/src
# ----------------Dependences-------------------

ifndef SLASH_PATH
SLASH_PATH = $(THIRD_PATH)/slash
SLASH_PATH = $(THIRD_PATH)/pstd
endif
SLASH = $(SLASH_PATH)/slash/lib/libslash$(DEBUG_SUFFIX).a
SLASH = $(SLASH_PATH)/pstd/lib/libpstd$(DEBUG_SUFFIX).a

ifndef PINK_PATH
PINK_PATH = $(THIRD_PATH)/pink
PINK_PATH = $(THIRD_PATH)/net
endif
PINK = $(PINK_PATH)/pink/lib/libpink$(DEBUG_SUFFIX).a
PINK = $(PINK_PATH)/net/lib/libnet$(DEBUG_SUFFIX).a

ifndef ROCKSDB_PATH
ROCKSDB_PATH = $(THIRD_PATH)/rocksdb
Expand Down Expand Up @@ -103,8 +103,8 @@ INCLUDE_PATH += -I$(GLOG_PATH)/src
endif

LIB_PATH = -L./ \
-L$(SLASH_PATH)/slash/lib \
-L$(PINK_PATH)/pink/lib \
-L$(SLASH_PATH)/pstd/lib \
-L$(PINK_PATH)/net/lib \
-L$(BLACKWIDOW_PATH)/lib \
-L$(ROCKSDB_PATH) \

Expand All @@ -113,8 +113,8 @@ LIB_PATH += -L$(GLOG_PATH)/.libs
endif

LDFLAGS += $(LIB_PATH) \
-lpink$(DEBUG_SUFFIX) \
-lslash$(DEBUG_SUFFIX) \
-lnet$(DEBUG_SUFFIX) \
-lpstd$(DEBUG_SUFFIX) \
-lblackwidow$(DEBUG_SUFFIX) \
-lrocksdb$(DEBUG_SUFFIX) \
-lglog \
Expand Down Expand Up @@ -189,7 +189,7 @@ $(SRC_PATH)/build_version.cc: FORCE
$(AM_V_at)if test -f $@; then \
cmp -s $@-t $@ && rm -f $@-t || mv -f $@-t $@; \
else mv -f $@-t $@; fi
FORCE:
FORCE:

LIBOBJECTS = $(LIB_SOURCES:.cc=.o)
PROTOOBJECTS = $(PIKA_PROTO:.proto=.pb.o)
Expand Down Expand Up @@ -222,16 +222,16 @@ $(BINARY): $(SLASH) $(PINK) $(ROCKSDB) $(BLACKWIDOW) $(GLOG) $(PROTOOBJECTS) $(L
$(AM_V_at)mkdir -p $(OUTPUT)/bin
$(AM_V_at)mv $@ $(OUTPUT)/bin
$(AM_V_at)cp -r $(CURDIR)/conf $(OUTPUT)


$(SLASH):
$(AM_V_at)make -C $(SLASH_PATH)/slash/ DEBUG_LEVEL=$(DEBUG_LEVEL)
$(AM_V_at)make -C $(SLASH_PATH)/pstd/ DEBUG_LEVEL=$(DEBUG_LEVEL)

$(PINK):
$(AM_V_at)make -C $(PINK_PATH)/pink/ DEBUG_LEVEL=$(DEBUG_LEVEL) NO_PB=0 SLASH_PATH=$(SLASH_PATH)
$(AM_V_at)make -C $(PINK_PATH)/net/ DEBUG_LEVEL=$(DEBUG_LEVEL) NO_PB=0 SLASH_PATH=$(SLASH_PATH)

$(ROCKSDB):
$(AM_V_at) CXXFLAGS='$(ROCKSDB_CXXFLAGS)' make -j $(PROCESSOR_NUMS) -C $(ROCKSDB_PATH)/ static_lib DISABLE_JEMALLOC=1 DEBUG_LEVEL=$(DEBUG_LEVEL)
$(AM_V_at) CXXFLAGS='$(ROCKSDB_CXXFLAGS)' make -j $(PROCESSOR_NUMS) -C $(ROCKSDB_PATH)/ static_lib DISABLE_JEMALLOC=1 DEBUG_LEVEL=$(DEBUG_LEVEL)

$(BLACKWIDOW):
$(AM_V_at)make -C $(BLACKWIDOW_PATH) ROCKSDB_PATH=$(ROCKSDB_PATH) SLASH_PATH=$(SLASH_PATH) DEBUG_LEVEL=$(DEBUG_LEVEL)
Expand All @@ -247,8 +247,8 @@ clean:
find $(SRC_PATH) -type f -regex ".*\.\(\(gcda\)\|\(gcno\)\)" -exec rm {} \;

distclean: clean
make -C $(PINK_PATH)/pink/ SLASH_PATH=$(SLASH_PATH) clean
make -C $(SLASH_PATH)/slash/ clean
make -C $(PINK_PATH)/net/ SLASH_PATH=$(SLASH_PATH) clean
make -C $(SLASH_PATH)/pstd/ clean
make -C $(BLACKWIDOW_PATH)/ clean
make -C $(ROCKSDB_PATH)/ clean
# make -C $(GLOG_PATH)/ clean
4 changes: 2 additions & 2 deletions docs/ops/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ A3: 一些旧版本的pika对Ubuntu环境兼容不好,某些情况下会出

### 2 设计与实现
#### Q1: 为什么要开那么多线程?比如purge,搞个定时任务不就好了。难道编程框架不支持定时器?
A1: pika有一些比较耗时的任务,如删binlog,扫描key,备份,同步数据文件等等,为了不影响正常的用户请求,这些任务都是放到后台执行的,并且将能并行的都放到不同线程里来最大程度上提升后台任务的执行速度;你说的变成框架是pink吗?pink是支持定时器的,每一个workerthread只要用户定义了cronhandle和频率,就会定时执行要执行的内容,不过这时候worker是被独占的,响应不了用户请求,所以占时的任务最好还是单独开线程去做,redis的bio也是这个原因
A1: pika有一些比较耗时的任务,如删binlog,扫描key,备份,同步数据文件等等,为了不影响正常的用户请求,这些任务都是放到后台执行的,并且将能并行的都放到不同线程里来最大程度上提升后台任务的执行速度;你说的变成框架是pink (网络库名称, 代码位于 src/net) 吗?pink是支持定时器的,每一个workerthread只要用户定义了cronhandle和频率,就会定时执行要执行的内容,不过这时候worker是被独占的,响应不了用户请求,所以占时的任务最好还是单独开线程去做,redis的bio也是这个原因

#### Q2: heartbeat让sender做不就好了?或者说sender有必要那么多线程吗?
A2: 这主要有两个原因,第一为了提高同步速度,sender只发不收,receiver只收不发,心跳是又单独的线程去做,如果心跳又sender来做,那么为了一秒仅有一次的心跳还要去复杂化sender和receiver的逻辑;第二其实前期尝试过合并在一起来进行连接级别的存活检测,当写入压力过大的时候会心跳包的收发会延后,导致存活检测被影响,slave误判master超时而进行不必要的重连
Expand Down Expand Up @@ -86,4 +86,4 @@ A22: 一主 14 从的场景是用户的写入都是晚上定期的灌数据,
#### Q23:设置了expire-logs-nums(至少为10)和binlog过期时间,为何master中仍然有大量的write2file文件?
A23: pika会定期检查binlog文件,如果binlog数目超过了expire-logs-nums或者过期时间,并且所有的从节点都已经对该binlog文件进行过同步,那么binlog文件就会被删除。确认过expire-logs-nums和过期时间设置正确,可以通过info命令查看是否有从节点同步延迟比较大,导致binlog无法被删除。

##### 如果您有其他问题,请联系直接在github issue上描述您的问题,我们第一时间回复。加入QQ群294254078,我们会不定期在群里发布更新消息。
##### 如果您有其他问题,请联系直接在github issue上描述您的问题,我们第一时间回复。加入QQ群294254078,我们会不定期在群里发布更新消息。
4 changes: 2 additions & 2 deletions include/pika_auxiliary_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
#ifndef PIKA_AUXILIARY_THREAD_H_
#define PIKA_AUXILIARY_THREAD_H_

#include "pink/include/pink_thread.h"
#include "net/include/net_thread.h"

#include "pstd/include/pstd_mutex.h"

class PikaAuxiliaryThread : public pink::Thread {
class PikaAuxiliaryThread : public net::Thread {
public:
PikaAuxiliaryThread() :
mu_(),
Expand Down
20 changes: 10 additions & 10 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
#include "include/pika_command.h"


class PikaClientConn: public pink::RedisConn {
class PikaClientConn: public net::RedisConn {
public:
using WriteCompleteCallback = std::function<void()>;

struct BgTaskArg {
std::shared_ptr<Cmd> cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr;
std::vector<pink::RedisCmdArgsType> redis_cmds;
std::vector<net::RedisCmdArgsType> redis_cmds;
std::shared_ptr<std::string> resp_ptr;
LogOffset offset;
std::string table_name;
Expand All @@ -39,16 +39,16 @@ class PikaClientConn: public pink::RedisConn {
};

PikaClientConn(int fd, std::string ip_port,
pink::Thread *server_thread,
pink::PinkEpoll* pink_epoll,
const pink::HandleType& handle_type,
net::Thread *server_thread,
net::NetEpoll* net_epoll,
const net::HandleType& handle_type,
int max_conn_rubf_size);
virtual ~PikaClientConn() {}

virtual void ProcessRedisCmds(const std::vector<pink::RedisCmdArgsType>& argvs, bool async, std::string* response) override;
virtual void ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>& argvs, bool async, std::string* response) override;

void BatchExecRedisCmd(const std::vector<pink::RedisCmdArgsType>& argvs);
int DealMessage(const pink::RedisCmdArgsType& argv, std::string* response) {
void BatchExecRedisCmd(const std::vector<net::RedisCmdArgsType>& argvs);
int DealMessage(const net::RedisCmdArgsType& argv, std::string* response) {
return 0;
}
static void DoBackgroundTask(void* arg);
Expand All @@ -59,7 +59,7 @@ class PikaClientConn: public pink::RedisConn {
void SetCurrentTable(const std::string& table_name) { current_table_ = table_name; }
void SetWriteCompleteCallback(WriteCompleteCallback cb) { write_completed_cb_ = cb; }

pink::ServerThread* server_thread() {
net::ServerThread* server_thread() {
return server_thread_;
}

Expand All @@ -70,7 +70,7 @@ class PikaClientConn: public pink::RedisConn {
std::atomic<int> resp_num;
std::vector<std::shared_ptr<std::string>> resp_array;
private:
pink::ServerThread* const server_thread_;
net::ServerThread* const server_thread_;
std::string current_table_;
WriteCompleteCallback write_completed_cb_;
bool is_pubsub_;
Expand Down
12 changes: 6 additions & 6 deletions include/pika_client_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <vector>
#include <string>

#include "pink/include/thread_pool.h"
#include "pink/include/bg_thread.h"
#include "net/include/thread_pool.h"
#include "net/include/bg_thread.h"

class PikaClientProcessor {
public:
Expand All @@ -21,13 +21,13 @@ class PikaClientProcessor {
~PikaClientProcessor();
int Start();
void Stop();
void SchedulePool(pink::TaskFunc func, void* arg);
void SchedulePool(net::TaskFunc func, void* arg);
void ScheduleBgThreads(
pink::TaskFunc func, void* arg, const std::string& hash_str);
net::TaskFunc func, void* arg, const std::string& hash_str);
size_t ThreadPoolCurQueueSize();

private:
pink::ThreadPool* pool_;
std::vector<pink::BGThread*> bg_threads_;
net::ThreadPool* pool_;
std::vector<net::BGThread*> bg_threads_;
};
#endif // PIKA_CLIENT_PROCESSOR_H_
12 changes: 6 additions & 6 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

#include <unordered_map>

#include "pink/include/redis_conn.h"
#include "pink/include/pink_conn.h"
#include "net/include/redis_conn.h"
#include "net/include/net_conn.h"
#include "pstd/include/pstd_string.h"

#include "include/pika_partition.h"
Expand Down Expand Up @@ -218,7 +218,7 @@ const std::string kCmdNamePkClusterAddTable = "pkclusteraddtable";
const std::string kCmdNamePkClusterDelTable = "pkclusterdeltable";

const std::string kClusterPrefix = "pkcluster";
typedef pink::RedisCmdArgsType PikaCmdArgsType;
typedef net::RedisCmdArgsType PikaCmdArgsType;
static const int RAW_ARGS_LEN = 1024 * 1024;

enum CmdFlagsMask {
Expand Down Expand Up @@ -479,8 +479,8 @@ class Cmd: public std::enable_shared_from_this<Cmd> {
uint32_t filenum,
uint64_t offset);

void SetConn(const std::shared_ptr<pink::PinkConn> conn);
std::shared_ptr<pink::PinkConn> GetConn();
void SetConn(const std::shared_ptr<net::NetConn> conn);
std::shared_ptr<net::NetConn> GetConn();

void SetResp(const std::shared_ptr<std::string> resp);
std::shared_ptr<std::string> GetResp();
Expand All @@ -506,7 +506,7 @@ class Cmd: public std::enable_shared_from_this<Cmd> {
PikaCmdArgsType argv_;
std::string table_name_;

std::weak_ptr<pink::PinkConn> conn_;
std::weak_ptr<net::NetConn> conn_;
std::weak_ptr<std::string> resp_;
CmdStage stage_;
uint64_t do_duration_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class ConsensusCoordinator {
}
std::shared_ptr<Cmd> cmd_ptr;
};
static int InitCmd(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv);
static int InitCmd(net::RedisParser* parser, const net::RedisCmdArgsType& argv);

std::string ToStringStatus() {
std::stringstream tmp_stream;
Expand Down
4 changes: 2 additions & 2 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <set>
#include <glog/logging.h>

#include "pink/include/redis_cli.h"
#include "net/include/redis_cli.h"

#define PIKA_SYNC_BUFFER_SIZE 1000
#define PIKA_MAX_WORKER_THREAD_NUM 24
Expand Down Expand Up @@ -440,7 +440,7 @@ struct SlowlogEntry {
int64_t id;
int64_t start_time;
int64_t duration;
pink::RedisCmdArgsType argv;
net::RedisCmdArgsType argv;
};

#define PIKA_MIN_RESERVED_FDS 5000
Expand Down
18 changes: 9 additions & 9 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,30 @@ class PikaDispatchThread {
}

private:
class ClientConnFactory : public pink::ConnFactory {
class ClientConnFactory : public net::ConnFactory {
public:
explicit ClientConnFactory(int max_conn_rbuf_size)
: max_conn_rbuf_size_(max_conn_rbuf_size) {
}
virtual std::shared_ptr<pink::PinkConn> NewPinkConn(
virtual std::shared_ptr<net::NetConn> NewNetConn(
int connfd,
const std::string &ip_port,
pink::Thread* server_thread,
net::Thread* server_thread,
void* worker_specific_data,
pink::PinkEpoll* pink_epoll) const {
return std::static_pointer_cast<pink::PinkConn>
(std::make_shared<PikaClientConn>(connfd, ip_port, server_thread, pink_epoll, pink::HandleType::kAsynchronous, max_conn_rbuf_size_));
net::NetEpoll* net_epoll) const {
return std::static_pointer_cast<net::NetConn>
(std::make_shared<PikaClientConn>(connfd, ip_port, server_thread, net_epoll, net::HandleType::kAsynchronous, max_conn_rbuf_size_));
}
private:
int max_conn_rbuf_size_;
};

class Handles : public pink::ServerHandle {
class Handles : public net::ServerHandle {
public:
explicit Handles(PikaDispatchThread* pika_disptcher)
: pika_disptcher_(pika_disptcher) {
}
using pink::ServerHandle::AccessHandle;
using net::ServerHandle::AccessHandle;
bool AccessHandle(std::string& ip) const override;
void CronHandle() const override;

Expand All @@ -58,6 +58,6 @@ class PikaDispatchThread {

ClientConnFactory conn_factory_;
Handles handles_;
pink::ServerThread* thread_rep_;
net::ServerThread* thread_rep_;
};
#endif
6 changes: 3 additions & 3 deletions include/pika_monitor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
#include <queue>
#include <atomic>

#include "pink/include/pink_thread.h"
#include "net/include/net_thread.h"
#include "pstd/include/pstd_mutex.h"

#include "include/pika_define.h"
#include "include/pika_client_conn.h"

class PikaMonitorThread : public pink::Thread {
class PikaMonitorThread : public net::Thread {
public:
PikaMonitorThread();
virtual ~PikaMonitorThread();
Expand All @@ -31,7 +31,7 @@ class PikaMonitorThread : public pink::Thread {
private:
void AddCronTask(MonitorCronTask task);
bool FindClient(const std::string& ip_port);
pink::WriteStatus SendMessage(int32_t fd, std::string& message);
net::WriteStatus SendMessage(int32_t fd, std::string& message);
void RemoveMonitorClient(const std::string& ip_port);

std::atomic<bool> has_monitor_clients_;
Expand Down
Loading