diff --git a/.gitignore b/.gitignore index 657581e..0830328 100644 --- a/.gitignore +++ b/.gitignore @@ -68,3 +68,5 @@ nutcracker Makefile Makefile.in + +/output diff --git a/src/lua/pool.lua b/src/lua/pool.lua index 3b60e71..e8b0682 100644 --- a/src/lua/pool.lua +++ b/src/lua/pool.lua @@ -14,6 +14,9 @@ ffi.cdef[[ void ffi_server_table_delete(struct server_pool *pool, const char *name); rstatus_t ffi_stats_reset(struct server_pool *pool); + + void ffi_slots_lock(struct server_pool *pool); + void ffi_slots_unlock(struct server_pool *pool); ]] local server = require("server") @@ -42,7 +45,7 @@ function _M.fetch_server(self, config) else s = table.remove(self._se_pool, 1) end - + return s end @@ -59,7 +62,7 @@ function _M.fetch_replica_set(self) else rs = table.remove(self._rs_pool, 1) end - + return rs end @@ -167,9 +170,11 @@ function _M.build_replica_sets(self) end function _M.bind_slots(self) + C.ffi_slots_lock(__pool) for _,rs in ipairs(self.replica_sets) do rs:bind_slots() end + C.ffi_slots_unlock(__pool) end return _M diff --git a/src/nc_script.c b/src/nc_script.c index 5b2ea0a..c852719 100644 --- a/src/nc_script.c +++ b/src/nc_script.c @@ -194,8 +194,20 @@ ffi_server_disconnect(struct server *server) } void -ffi_slots_set_replicaset(struct server_pool *pool, - struct replicaset *rs, +ffi_slots_lock(struct server_pool *pool) { + log_debug(LOG_VERB, "lock slots to update"); + pthread_mutex_lock(&pool->slots_mutex); +} + +void +ffi_slots_unlock(struct server_pool *pool) { + log_debug(LOG_VERB, "unlock slots. update done"); + pthread_mutex_unlock(&pool->slots_mutex); +} + +void +ffi_slots_set_replicaset(struct server_pool *pool, + struct replicaset *rs, int left, int right) { int i; @@ -262,7 +274,28 @@ script_init(struct server_pool *pool) L = luaL_newstate(); /* Create Lua state variable */ pool->L = L; luaL_openlibs(L); /* Load Lua libraries */ - if (luaL_loadfile(L, "lua/redis.lua")) { + #define MAX_PATH_LEN 1000 + char path[MAX_PATH_LEN] = {'\0'}; + char current[MAX_PATH_LEN] = {'\0'}; + char *lua_name = "lua/redis.lua"; + + if (get_my_path(path, sizeof(path)) == NC_ERROR) { + log_error("get_my_path filed:%s", strerror(errno)); + return NC_ERROR; + } + + dirname(path); + + log_debug(LOG_VERB, "lua script path is %s", path); + + getcwd(current,sizeof(current)); + + if (chdir(path) < 0) { + log_error("chdir failed: %s", path); + return NC_ERROR; + } + + if (luaL_loadfile(L, lua_name)) { log_debug(LOG_VERB, "init lua script failed - %s", lua_tostring(L, -1)); return NC_ERROR; } @@ -277,6 +310,10 @@ script_init(struct server_pool *pool) if (lua_pcall(L, 0, 0, 0) != 0) { log_error("call lua script failed - %s", lua_tostring(L, -1)); } + if (chdir(current) < 0) { + log_error("chdir failed: %s", current); + return NC_ERROR; + } return NC_OK; } diff --git a/src/nc_server.c b/src/nc_server.c index 3967a86..4bbb378 100644 --- a/src/nc_server.c +++ b/src/nc_server.c @@ -852,6 +852,50 @@ server_pool_each_run(void *elem, void *data) return server_pool_run(elem); } +static +void *server_script_thread(void *elem) { + struct server_pool *sp = elem; + int64_t t_start, t_end; + sleep(2); + for(;;) { + char buf[1]; + if (1 != read(sp->noti_fd[0], buf, sizeof(buf))) { + return NULL; + } + log_debug(LOG_VERB, "in server_scrip_thread: script_call"); + t_start = nc_usec_now(); + script_call(sp, sp->mbuf_thread->start, sp->mbuf_thread->last - sp->mbuf_thread->start, + "update_cluster_nodes"); + t_end = nc_usec_now(); + log_debug(LOG_VERB, "update slots done in %lldus",t_end - t_start); + } +} + +/* init create the thread to run script and init a mutex*/ +static rstatus_t +server_pool_each_script_thread(void *elem, void *data) +{ + struct server_pool *sp = elem; + + if (pthread_mutex_init(&sp->slots_mutex, NULL) != 0) { + log_debug(LOG_WARN, "pthread_mutex_init failed"); + return NC_ERROR; + } + /* create a pipe to notify */ + if (pipe(sp->noti_fd) != 0) { + log_debug(LOG_WARN, "pipe failed"); + return NC_ERROR; + } + + /* start the thread*/ + if (pthread_create(&sp->scri_thread, NULL, server_script_thread, (void *)sp)) { + log_debug(LOG_WARN, "pthread create failed"); + return NC_ERROR; + } + + return NC_OK; +} + static rstatus_t server_pool_each_set_table(void *elem, void *data) { @@ -903,6 +947,12 @@ server_pool_init(struct array *server_pool, struct array *conf_pool, return status; } + /* init slots mutex */ + status = array_each(server_pool, server_pool_each_script_thread, ctx); + if (status != NC_OK) { + return status; + } + /* set tick callback */ status = array_each(server_pool, server_pool_each_set_tick_callback, ctx); if (status != NC_OK) { @@ -966,13 +1016,13 @@ server_pool_each_tick(void *elem, void *data) struct server_pool *pool = elem; pool->pool_tick(pool); - + /* always returns NC_OK */ return NC_OK; } -void -server_pool_tick(struct context *ctx) +void +server_pool_tick(struct context *ctx) { struct array *pools; diff --git a/src/nc_server.h b/src/nc_server.h index 0997aac..03a310e 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -143,6 +143,10 @@ struct server_pool { struct string zone; /* avaliablity zone */ struct hash_table *server_table; /* address(ip:port) to server map */ struct replicaset *slots[REDIS_CLUSTER_SLOTS]; + pthread_mutex_t slots_mutex; /* mutex to access the slot */ + pthread_t scri_thread; + int noti_fd[2]; /* pipe fd to notify thread */ + struct mbuf *mbuf_thread; /* set the value before call the script thread */ lua_State *L; }; diff --git a/src/nc_util.c b/src/nc_util.c index a300faf..ad5827e 100644 --- a/src/nc_util.c +++ b/src/nc_util.c @@ -634,3 +634,11 @@ nc_unresolve_desc(int sd) return nc_unresolve_addr(addr, addrlen); } + +int +get_my_path(char *buf, int len) { + if (buf == NULL) { + return NC_ERROR; + } + return readlink("/proc/self/exe", buf, len); +} diff --git a/src/nc_util.h b/src/nc_util.h index f718532..80d5a4d 100644 --- a/src/nc_util.h +++ b/src/nc_util.h @@ -212,4 +212,7 @@ char *nc_unresolve_addr(struct sockaddr *addr, socklen_t addrlen); char *nc_unresolve_peer_desc(int sd); char *nc_unresolve_desc(int sd); +/* return binary path in buf and len */ +int get_my_path(char *buf, int len); + #endif diff --git a/src/proto/nc_redis.c b/src/proto/nc_redis.c index b2a1a34..57546ee 100644 --- a/src/proto/nc_redis.c +++ b/src/proto/nc_redis.c @@ -2789,15 +2789,20 @@ redis_routing(struct context *ctx, struct server_pool *pool, struct server *server = NULL; idx = server_pool_hash(pool, key, keylen) % REDIS_CLUSTER_SLOTS; - + if (msg->type > MSG_REQ_REDIS_WRITECMD_START) { + pthread_mutex_lock(&pool->slots_mutex); server = pool->slots[idx]->master; + pthread_mutex_unlock(&pool->slots_mutex); } else { for (i = 0; i < NC_MAXTAGNUM; i++) { uint32_t n; struct array *slaves; + pthread_mutex_lock(&pool->slots_mutex); slaves = &pool->slots[idx]->tagged_servers[i]; + pthread_mutex_unlock(&pool->slots_mutex); + if (array_n(slaves) == 0) { continue; } @@ -2949,7 +2954,9 @@ redis_pre_rsp_forward(struct context *ctx, struct conn * s_conn, struct msg *msg keylen = (uint32_t)(kpos->end - kpos->start); idx = pool->key_hash((char*)key, keylen) % REDIS_CLUSTER_SLOTS; if (msg->integer != idx) { + pthread_mutex_lock(&pool->slots_mutex); pool->slots[idx] = pool->slots[msg->integer]; + pthread_mutex_unlock(&pool->slots_mutex); } } @@ -2958,7 +2965,7 @@ redis_pre_rsp_forward(struct context *ctx, struct conn * s_conn, struct msg *msg ferror: log_debug(LOG_WARN, "server: failed to redirect message"); - + msg_put(pmsg); msg_put(msg); return NC_ERROR; @@ -2969,16 +2976,15 @@ redis_pre_rsp_forward(struct context *ctx, struct conn * s_conn, struct msg *msg int64_t t_start, t_end; struct mbuf *mbuf; - server = s_conn->owner; - pool = server->owner; - /* FIXME: check length */ mbuf = STAILQ_FIRST(&msg->mhdr); - t_start = nc_usec_now(); - script_call(pool, mbuf->start, mbuf->last - mbuf->start, "update_cluster_nodes"); - t_end = nc_usec_now(); - log_debug(LOG_VERB, "update slots done in %lldus", t_end - t_start); + + pool->mbuf_thread = mbuf; req_put(pmsg); + + if (write(pool->noti_fd[1], "1", 1) != 1) { + log_debug(LOG_WARN, "write to pipe failed"); + } return NC_ERROR; }