Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor writev / add test mock for C io, epoll #3

Merged
merged 1 commit into from
Jul 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,36 @@ else
endif

WORKDIR=.
OBJDIR=./build

include misc/mf-template.mk

all:main.d core_objs utilities libs_3rdparty
$(LINK) main.o utils/*.o core/*.o \
$(WORK_LIBS) $(SLINK) \
-o cerberus
all:main_exec
@echo "Done"

runtest:core_objs utilities libs_3rdparty
rm -f tmp.*.txt
main_exec:core_objs utilities libs_3rdparty main.d
$(LINK) utils/*.o $(OBJDIR)/*.o $(WORK_LIBS) $(SLINK) -o cerberus

runtest:main_exec utilities libs_3rdparty
@make -f test/Makefile MODE=$(MODE) COMPILER=$(COMPILER) \
CHECK_MEM=$(CHECK_MEM)

utilities:
@make -f utils/Makefile MODE=$(MODE) COMPILER=$(COMPILER)

kill-test-server:
@python test/cluster_launcher.py kill

core_objs:
@make -f core/Makefile MODE=$(MODE) COMPILER=$(COMPILER)
@mkdir -p $(OBJDIR)
@make -f core/Makefile OBJDIR=$(OBJDIR) MODE=$(MODE) COMPILER=$(COMPILER)

libs_3rdparty:
@mkdir -p $(LIBS_DIR)
@make -f backtracpp/Makefile LIB_DIR=$(LIBS_DIR) REL_PATH=backtracpp

clean:
rm -f tmp.*
find -type f -name "*.o" -exec rm {} \;
rm -f cerberus
rm -f test/*.out
rm -rf $(LIBS_DIR)
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Run
The argument is path of a configuration file, which should contains at least

* bind : (integer) local port to listen
* node : (address) one of active node in a cluster; format should be host:port
* node : (address, optional) one of active node in a cluster; format should be *host:port*; could also set after cerberus launched, via the `SETREMOTES` command, see it below
* thread: (integer) number of threads
* read-slave: (optional, default off) set to "1" to turn on read slave mode. A proxy in read-slave mode won't support writing commands like `SET`, `INCR`, `PUBLISH`, and it would select slave nodes for reading commands if possible. For more information please read [here (CN)](https://github.com/HunanTV/redis-cerberus/wiki/%E8%AF%BB%E5%86%99%E5%88%86%E7%A6%BB).

Expand Down
2 changes: 1 addition & 1 deletion common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#include "utils/typetraits.hpp"

#define VERSION "0.6.5-2015-05-25"
#define VERSION "0.6.7-2015-07-16"

namespace cerb {

Expand Down
6 changes: 3 additions & 3 deletions core/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ WORKDIR=core

include misc/mf-template.mk

core:concurrence.d buffer.d command.d response.d fdutil.d connection.d server.d \
client.d slot_map.d slot_calc.d proxy.d subscription.d exceptions.d \
globals.d stats.d
core:concurrence.d buffer.d message.d command.d response.d fdutil.d globals.d \
connection.d server.d client.d subscription.d slot_map.d slot_calc.d \
proxy.d acceptor.d stats.d
true
33 changes: 33 additions & 0 deletions core/acceptor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include "acceptor.hpp"
#include "except/exceptions.hpp"
#include "syscalls/fctl.h"
#include "syscalls/poll.h"

using namespace cerb;

Acceptor::Acceptor(util::sref<Proxy> p, int listen_port)
: Connection(fctl::new_stream_socket())
, _proxy(p)
{
fctl::set_nonblocking(this->fd);
fctl::bind_to(this->fd, listen_port);
poll::poll_add_read(p->epfd, this->fd, this);
}

void Acceptor::on_events(int)
{
int cfd;
while ((cfd = cio::accept(this->fd)) > 0)
{
fctl::set_nonblocking(cfd);
fctl::set_tcpnodelay(cfd);
this->_proxy->new_client(cfd);
}
if (cfd == -1) {
if (errno != EAGAIN && errno != ECONNABORTED
&& errno != EPROTO && errno != EINTR)
{
throw SocketAcceptError(errno);
}
}
}
22 changes: 22 additions & 0 deletions core/acceptor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#ifndef __CERBERUS_ACCEPTOR_HPP__
#define __CERBERUS_ACCEPTOR_HPP__

#include "connection.hpp"
#include "proxy.hpp"

namespace cerb {

class Acceptor
: public Connection
{
util::sref<Proxy> const _proxy;
public:
Acceptor(util::sref<Proxy> p, int listen_port);

void on_events(int);
void on_error() {}
};

}

#endif /* __CERBERUS_ACCEPTOR_HPP__ */
137 changes: 80 additions & 57 deletions core/buffer.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
#include <climits>
#include <unistd.h>
#include <sys/uio.h>
#include <algorithm>

#include "buffer.hpp"
#include "exceptions.hpp"
#include "except/exceptions.hpp"
#include "utils/logging.hpp"

using namespace cerb;

static int const BUFFER_SIZE = 16 * 1024;
static int const WRITEV_CHUNK_SIZE = 2 * 1024 * 1024;
static int const WRITEV_MAX_SIZE = 2 * 1024 * 1024;

static void on_error(std::string const& message)
{
Expand All @@ -28,19 +26,17 @@ static void on_error(std::string const& message)
Buffer Buffer::from_string(std::string const& s)
{
Buffer b;
std::for_each(s.begin(), s.end(),
[&](char ch)
{
b._buffer.push_back(byte(ch));
});
for (char ch: s) {
b._buffer.push_back(byte(ch));
}
return std::move(b);
}

int Buffer::read(int fd)
{
byte local[BUFFER_SIZE];
int n = 0, nread;
while ((nread = ::read(fd, local, BUFFER_SIZE)) > 0) {
while ((nread = cio::read(fd, local, BUFFER_SIZE)) > 0) {
n += nread;
this->_buffer.insert(this->_buffer.end(), local, local + nread);
}
Expand All @@ -54,15 +50,13 @@ int Buffer::write(int fd) const
{
size_type n = 0;
while (n < _buffer.size()) {
int nwrite = ::write(fd, _buffer.data() + n, _buffer.size() - n);
int nwrite = cio::write(fd, _buffer.data() + n, _buffer.size() - n);
if (nwrite == -1) {
on_error("buffer write");
continue;
}
LOG(DEBUG) << "Write to " << fd << " : " << nwrite << " bytes written";
n += nwrite;
}
LOG(DEBUG) << "Total written " << fd << " : " << n << " bytes";
return n;
}

Expand All @@ -71,10 +65,10 @@ void Buffer::truncate_from_begin(iterator i)
this->_buffer.erase(_buffer.begin(), i);
}

void Buffer::buffer_ready(std::vector<struct iovec>& iov)
void Buffer::buffer_ready(std::vector<cio::iovec>& iov)
{
if (!_buffer.empty()) {
struct iovec v = {_buffer.data(), size_t(_buffer.size())};
cio::iovec v = {_buffer.data(), size_t(_buffer.size())};
LOG(DEBUG) << "Push iov " << reinterpret_cast<void*>(_buffer.data()) << ' ' << _buffer.size();
iov.push_back(v);
}
Expand Down Expand Up @@ -108,58 +102,87 @@ bool Buffer::same_as_string(std::string const& s) const
});
}

static void write_vec(int fd, int iovcnt, struct iovec* iov, ssize_t total)
static int write_single(int fd, byte const* buf, int buf_len, int* offset)
{
LOG(DEBUG) << "*writev to " << fd << " iovcnt=" << iovcnt << " total bytes=" << total;
ssize_t written;
while (total != (written = ::writev(fd, iov, iovcnt))) {
if (written == -1) {
on_error("buffer writev");
continue;
}
LOG(DEBUG) << "*writev partial written bytes=" << written << " need to write=" << total;
total -= written;
while (iov->iov_len <= size_t(written)) {
written -= iov->iov_len;
++iov;
--iovcnt;
while (*offset < buf_len) {
ssize_t nwritten = cio::write(fd, buf + *offset, buf_len - *offset);
if (nwritten == -1) {
on_error("buffer write");
return 0;
}
iov->iov_base = reinterpret_cast<byte*>(iov->iov_base) + written;
iov->iov_len -= written;
LOG(DEBUG) << "Write to " << fd << " : " << nwritten << " bytes written";
*offset += nwritten;
}
return 1;
}

void Buffer::writev(int fd, std::vector<util::sref<Buffer>> const& arr)
static int write_vec(int fd, int iovcnt, cio::iovec* iov, ssize_t total, int* first_offset)
{
struct iovec vec[IOV_MAX];
int iov_index = 0;
size_type bulk_write_size = 0;
if (arr.size() == 1) {
arr[0]->write(fd);
return;
if (1 == iovcnt) {
return write_single(fd, reinterpret_cast<byte*>(iov->iov_base),
iov->iov_len, first_offset);
}
for (auto b: arr) {
if (iov_index == IOV_MAX
|| bulk_write_size + b->size() > WRITEV_CHUNK_SIZE)
{
write_vec(fd, iov_index, vec, bulk_write_size);
iov_index = 0;
bulk_write_size = 0;

LOG(DEBUG) << "*writev to " << fd << " iovcnt=" << iovcnt << " total bytes=" << total;
iov[0].iov_base = reinterpret_cast<byte*>(iov->iov_base) + *first_offset;
iov->iov_len -= *first_offset;
int written_iov = 0;
ssize_t nwritten;
while (total != (nwritten = cio::writev(fd, iov + written_iov, iovcnt - written_iov))) {
if (nwritten == 0) {
return written_iov;
}
if (b->size() > WRITEV_CHUNK_SIZE) {
b->write(fd);
continue;
if (nwritten == -1) {
on_error("buffer writev");
return written_iov;
}
vec[iov_index].iov_base = b->_buffer.data();
vec[iov_index].iov_len = b->size();
++iov_index;
bulk_write_size += b->size();
LOG(DEBUG) << "*writev partial: " << nwritten << " / " << total;
total -= nwritten;
while (iov[written_iov].iov_len <= size_t(nwritten)) {
nwritten -= iov[written_iov].iov_len;
++written_iov;
*first_offset = 0;
}
iov[written_iov].iov_base = reinterpret_cast<byte*>(iov[written_iov].iov_base) + nwritten;
iov[written_iov].iov_len -= nwritten;
*first_offset += nwritten;
}
if (iov_index == 1) {
arr.back()->write(fd);
return;
return iovcnt;
}

static std::pair<int, int> next_group_to_write(
std::deque<util::sref<Buffer>> const& buf_arr, int first_offset, cio::iovec* vec)
{
vec[0].iov_base = buf_arr[0]->data();
vec[0].iov_len = buf_arr[0]->size();
Buffer::size_type bulk_write_size = buf_arr[0]->size() - first_offset;
std::deque<util::sref<Buffer>>::size_type i = 1;
for (; i < buf_arr.size()
&& i < IOV_MAX
&& bulk_write_size + buf_arr[i]->size() <= WRITEV_MAX_SIZE;
++i)
{
vec[i].iov_base = buf_arr[i]->data();
vec[i].iov_len = buf_arr[i]->size();
bulk_write_size += vec[i].iov_len;
}
if (iov_index > 1) {
write_vec(fd, iov_index, vec, bulk_write_size);
return std::pair<int, int>(i, bulk_write_size);
}

bool BufferSet::writev(int fd)
{
cio::iovec vec[IOV_MAX];
while (!this->_buf_arr.empty()) {
auto x = ::next_group_to_write(this->_buf_arr, this->_1st_buf_offset, vec);
int iovcnt = x.first;
int bulk_write_size = x.second;
int iov_written = ::write_vec(fd, iovcnt, vec, bulk_write_size,
&this->_1st_buf_offset);
this->_buf_arr.erase(this->_buf_arr.begin(), this->_buf_arr.begin() + iov_written);
if (iov_written < iovcnt) {
return false;
}
this->_1st_buf_offset = 0;
}
return true;
}
32 changes: 30 additions & 2 deletions core/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
#define __CERBERUS_BUFFER_HPP__

#include <vector>
#include <deque>
#include <string>

#include "stats.hpp"
#include "utils/pointer.h"
#include "syscalls/cio.h"

namespace cerb {

Expand Down Expand Up @@ -73,16 +75,42 @@ namespace cerb {
_buffer.clear();
}

void* data()
{
return this->_buffer.data();
}

int read(int fd);
int write(int fd) const;
void truncate_from_begin(iterator i);
void buffer_ready(std::vector<struct iovec>& iov);
void buffer_ready(std::vector<cio::iovec>& iov);
void copy_from(const_iterator first, const_iterator last);
void append_from(const_iterator first, const_iterator last);
std::string to_string() const;
bool same_as_string(std::string const& s) const;
};

class BufferSet {
std::deque<util::sref<Buffer>> _buf_arr;
int _1st_buf_offset;
public:
BufferSet(BufferSet const&) = delete;

BufferSet()
: _1st_buf_offset(0)
{}

void append(util::sref<Buffer> buf)
{
this->_buf_arr.push_back(buf);
}

bool empty() const
{
return this->_buf_arr.empty();
}

static void writev(int fd, std::vector<util::sref<Buffer>> const& arr);
bool writev(int fd);
};

}
Expand Down
Loading