Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mod: check redis client connection got fin or not #30

Merged
merged 1 commit into from
Sep 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pink/include/server_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class ServerThread : public Thread {
virtual void HandleConnEvent(PinkFiredEvent *pfe) = 0;
};

// !!!Attention: If u use this constructor, the keepalive_timeout_ will
// be equal to kDefaultKeepAliveTime(60s). In master-slave mode, the slave
// binlog receiver will close the binlog sync connection in HolyThread::DoCronTask
// if master did not send data in kDefaultKeepAliveTime.
extern ServerThread *NewHolyThread(
int port,
ConnFactory *conn_factory,
Expand Down
54 changes: 54 additions & 0 deletions pink/src/redis_cli.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@
#include <fcntl.h>
#include <unistd.h>
#include <stdarg.h>
#include <poll.h>

#include <string>
#include <vector>
#include <atomic>

#include "pink/include/pink_define.h"
#include "pink/include/pink_cli.h"


namespace pink {

static time_t kCheckDiff = 1;

class RedisCli : public PinkCli {
public:
RedisCli();
Expand All @@ -29,6 +33,8 @@ class RedisCli : public PinkCli {

// Read, parse and store the reply
virtual Status Recv(void *result = NULL);
// Check whether the connection got fin from peer or not
virtual int CheckAliveness(void);

private:
RedisCmdArgsType argv_; // The parsed result
Expand All @@ -39,6 +45,7 @@ class RedisCli : public PinkCli {
int32_t rbuf_offset_;
int elements_; // the elements number of this current reply
int err_;
std::atomic<time_t> last_write_time_;

int GetReply();
int GetReplyFromReader();
Expand Down Expand Up @@ -76,14 +83,51 @@ RedisCli::RedisCli()
: rbuf_size_(REDIS_IOBUF_LEN),
rbuf_pos_(0),
rbuf_offset_(0),
last_write_time_(0),
err_(REDIS_OK) {
rbuf_ = reinterpret_cast<char*>(malloc(sizeof(char) * rbuf_size_));

last_write_time_ = time(NULL);
}

RedisCli::~RedisCli() {
free(rbuf_);
}

static inline int pollFd(int fd, int events, int ms) {
pollfd fds[1];
fds[0].fd = fd;
fds[0].events = events;
fds[0].revents = 0;

int ret = poll(fds, 1, ms);
if (ret > 0) {
return fds[0].revents;
}

return ret;
}

int RedisCli::CheckAliveness() {
char buf[1];

int ret = 0;
ret = pollFd(fd(), POLLIN | POLLPRI, 0);
if (0 < ret) {
int num = pread(fd(), buf, 1, MSG_PEEK);
if (num == 0) {
return -1;
}
if (num == -1) {
if (errno != EINTR && errno != EAGAIN && errno != EWOULDBLOCK) {
return -1;
}
}
}

return 0;
}

// We use passed-in send buffer here
Status RedisCli::Send(void *msg) {
Status s;
Expand All @@ -97,6 +141,16 @@ Status RedisCli::Send(void *msg) {

ssize_t nwritten;

time_t now;
time(&now);
if (kCheckDiff < now - last_write_time_) {
int ret = CheckAliveness();
if (ret < 0) {
return Status::IOError("connection closed");
}
last_write_time_ = now;
}

while (nleft > 0) {
if ((nwritten = write(fd(), wbuf + wbuf_pos, nleft)) <= 0) {
if (errno == EINTR) {
Expand Down