Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do script in thread & fix lua script path #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,5 @@ nutcracker

Makefile
Makefile.in

/output
9 changes: 7 additions & 2 deletions src/lua/pool.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ ffi.cdef[[
void ffi_server_table_delete(struct server_pool *pool, const char *name);

rstatus_t ffi_stats_reset(struct server_pool *pool);

void ffi_slots_lock(struct server_pool *pool);
void ffi_slots_unlock(struct server_pool *pool);
]]

local server = require("server")
Expand Down Expand Up @@ -42,7 +45,7 @@ function _M.fetch_server(self, config)
else
s = table.remove(self._se_pool, 1)
end

return s
end

Expand All @@ -59,7 +62,7 @@ function _M.fetch_replica_set(self)
else
rs = table.remove(self._rs_pool, 1)
end

return rs
end

Expand Down Expand Up @@ -167,9 +170,11 @@ function _M.build_replica_sets(self)
end

function _M.bind_slots(self)
C.ffi_slots_lock(__pool)
for _,rs in ipairs(self.replica_sets) do
rs:bind_slots()
end
C.ffi_slots_unlock(__pool)
end

return _M
43 changes: 40 additions & 3 deletions src/nc_script.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,20 @@ ffi_server_disconnect(struct server *server)
}

void
ffi_slots_set_replicaset(struct server_pool *pool,
struct replicaset *rs,
ffi_slots_lock(struct server_pool *pool) {
log_debug(LOG_VERB, "lock slots to update");
pthread_mutex_lock(&pool->slots_mutex);
}

void
ffi_slots_unlock(struct server_pool *pool) {
log_debug(LOG_VERB, "unlock slots. update done");
pthread_mutex_unlock(&pool->slots_mutex);
}

void
ffi_slots_set_replicaset(struct server_pool *pool,
struct replicaset *rs,
int left, int right)
{
int i;
Expand Down Expand Up @@ -262,7 +274,28 @@ script_init(struct server_pool *pool)
L = luaL_newstate(); /* Create Lua state variable */
pool->L = L;
luaL_openlibs(L); /* Load Lua libraries */
if (luaL_loadfile(L, "lua/redis.lua")) {
#define MAX_PATH_LEN 1000
char path[MAX_PATH_LEN] = {'\0'};
char current[MAX_PATH_LEN] = {'\0'};
char *lua_name = "lua/redis.lua";

if (get_my_path(path, sizeof(path)) == NC_ERROR) {
log_error("get_my_path filed:%s", strerror(errno));
return NC_ERROR;
}

dirname(path);

log_debug(LOG_VERB, "lua script path is %s", path);

getcwd(current,sizeof(current));

if (chdir(path) < 0) {
log_error("chdir failed: %s", path);
return NC_ERROR;
}

if (luaL_loadfile(L, lua_name)) {
log_debug(LOG_VERB, "init lua script failed - %s", lua_tostring(L, -1));
return NC_ERROR;
}
Expand All @@ -277,6 +310,10 @@ script_init(struct server_pool *pool)
if (lua_pcall(L, 0, 0, 0) != 0) {
log_error("call lua script failed - %s", lua_tostring(L, -1));
}
if (chdir(current) < 0) {
log_error("chdir failed: %s", current);
return NC_ERROR;
}

return NC_OK;
}
Expand Down
56 changes: 53 additions & 3 deletions src/nc_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,50 @@ server_pool_each_run(void *elem, void *data)
return server_pool_run(elem);
}

static
void *server_script_thread(void *elem) {
struct server_pool *sp = elem;
int64_t t_start, t_end;
sleep(2);
for(;;) {
char buf[1];
if (1 != read(sp->noti_fd[0], buf, sizeof(buf))) {
return NULL;
}
log_debug(LOG_VERB, "in server_scrip_thread: script_call");
t_start = nc_usec_now();
script_call(sp, sp->mbuf_thread->start, sp->mbuf_thread->last - sp->mbuf_thread->start,
"update_cluster_nodes");
t_end = nc_usec_now();
log_debug(LOG_VERB, "update slots done in %lldus",t_end - t_start);
}
}

/* init create the thread to run script and init a mutex*/
static rstatus_t
server_pool_each_script_thread(void *elem, void *data)
{
struct server_pool *sp = elem;

if (pthread_mutex_init(&sp->slots_mutex, NULL) != 0) {
log_debug(LOG_WARN, "pthread_mutex_init failed");
return NC_ERROR;
}
/* create a pipe to notify */
if (pipe(sp->noti_fd) != 0) {
log_debug(LOG_WARN, "pipe failed");
return NC_ERROR;
}

/* start the thread*/
if (pthread_create(&sp->scri_thread, NULL, server_script_thread, (void *)sp)) {
log_debug(LOG_WARN, "pthread create failed");
return NC_ERROR;
}

return NC_OK;
}

static rstatus_t
server_pool_each_set_table(void *elem, void *data)
{
Expand Down Expand Up @@ -903,6 +947,12 @@ server_pool_init(struct array *server_pool, struct array *conf_pool,
return status;
}

/* init slots mutex */
status = array_each(server_pool, server_pool_each_script_thread, ctx);
if (status != NC_OK) {
return status;
}

/* set tick callback */
status = array_each(server_pool, server_pool_each_set_tick_callback, ctx);
if (status != NC_OK) {
Expand Down Expand Up @@ -966,13 +1016,13 @@ server_pool_each_tick(void *elem, void *data)
struct server_pool *pool = elem;

pool->pool_tick(pool);

/* always returns NC_OK */
return NC_OK;
}

void
server_pool_tick(struct context *ctx)
void
server_pool_tick(struct context *ctx)
{
struct array *pools;

Expand Down
4 changes: 4 additions & 0 deletions src/nc_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ struct server_pool {
struct string zone; /* avaliablity zone */
struct hash_table *server_table; /* address(ip:port) to server map */
struct replicaset *slots[REDIS_CLUSTER_SLOTS];
pthread_mutex_t slots_mutex; /* mutex to access the slot */
pthread_t scri_thread;
int noti_fd[2]; /* pipe fd to notify thread */
struct mbuf *mbuf_thread; /* set the value before call the script thread */
lua_State *L;
};

Expand Down
8 changes: 8 additions & 0 deletions src/nc_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -634,3 +634,11 @@ nc_unresolve_desc(int sd)

return nc_unresolve_addr(addr, addrlen);
}

int
get_my_path(char *buf, int len) {
if (buf == NULL) {
return NC_ERROR;
}
return readlink("/proc/self/exe", buf, len);
}
3 changes: 3 additions & 0 deletions src/nc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,7 @@ char *nc_unresolve_addr(struct sockaddr *addr, socklen_t addrlen);
char *nc_unresolve_peer_desc(int sd);
char *nc_unresolve_desc(int sd);

/* return binary path in buf and len */
int get_my_path(char *buf, int len);

#endif
24 changes: 15 additions & 9 deletions src/proto/nc_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -2789,15 +2789,20 @@ redis_routing(struct context *ctx, struct server_pool *pool,
struct server *server = NULL;

idx = server_pool_hash(pool, key, keylen) % REDIS_CLUSTER_SLOTS;

if (msg->type > MSG_REQ_REDIS_WRITECMD_START) {
pthread_mutex_lock(&pool->slots_mutex);
server = pool->slots[idx]->master;
pthread_mutex_unlock(&pool->slots_mutex);
} else {
for (i = 0; i < NC_MAXTAGNUM; i++) {
uint32_t n;
struct array *slaves;

pthread_mutex_lock(&pool->slots_mutex);
slaves = &pool->slots[idx]->tagged_servers[i];
pthread_mutex_unlock(&pool->slots_mutex);

if (array_n(slaves) == 0) {
continue;
}
Expand Down Expand Up @@ -2949,7 +2954,9 @@ redis_pre_rsp_forward(struct context *ctx, struct conn * s_conn, struct msg *msg
keylen = (uint32_t)(kpos->end - kpos->start);
idx = pool->key_hash((char*)key, keylen) % REDIS_CLUSTER_SLOTS;
if (msg->integer != idx) {
pthread_mutex_lock(&pool->slots_mutex);
pool->slots[idx] = pool->slots[msg->integer];
pthread_mutex_unlock(&pool->slots_mutex);
}
}

Expand All @@ -2958,7 +2965,7 @@ redis_pre_rsp_forward(struct context *ctx, struct conn * s_conn, struct msg *msg

ferror:
log_debug(LOG_WARN, "server: failed to redirect message");

msg_put(pmsg);
msg_put(msg);
return NC_ERROR;
Expand All @@ -2969,16 +2976,15 @@ redis_pre_rsp_forward(struct context *ctx, struct conn * s_conn, struct msg *msg
int64_t t_start, t_end;
struct mbuf *mbuf;

server = s_conn->owner;
pool = server->owner;

/* FIXME: check length */
mbuf = STAILQ_FIRST(&msg->mhdr);
t_start = nc_usec_now();
script_call(pool, mbuf->start, mbuf->last - mbuf->start, "update_cluster_nodes");
t_end = nc_usec_now();
log_debug(LOG_VERB, "update slots done in %lldus", t_end - t_start);

pool->mbuf_thread = mbuf;
req_put(pmsg);

if (write(pool->noti_fd[1], "1", 1) != 1) {
log_debug(LOG_WARN, "write to pipe failed");
}
return NC_ERROR;
}

Expand Down