diff --git a/src/pm/hydra/lib/pmiserv_common.c b/src/pm/hydra/lib/pmiserv_common.c index 9db6e0864e7..eae6ffe7c5a 100644 --- a/src/pm/hydra/lib/pmiserv_common.c +++ b/src/pm/hydra/lib/pmiserv_common.c @@ -85,14 +85,16 @@ HYD_status HYD_pmcd_pmi_send(int fd, struct PMIU_cmd *pmi, struct HYD_pmcd_hdr * goto fn_exit; } -HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs) +HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_kvs **kvs) { HYD_status status = HYD_SUCCESS; HYDU_FUNC_ENTER(); - HYDU_MALLOC_OR_JUMP(*kvs, struct HYD_pmcd_pmi_kvs *, sizeof(struct HYD_pmcd_pmi_kvs), status); + HYDU_MALLOC_OR_JUMP(*kvs, struct HYD_kvs *, sizeof(struct HYD_kvs), status); (*kvs)->key_pair = NULL; - (*kvs)->tail = NULL; + (*kvs)->iter_end = NULL; + (*kvs)->iter_cur = NULL; + (*kvs)->iter_new_only = false; fn_exit: HYDU_FUNC_EXIT(); @@ -102,9 +104,9 @@ HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs) goto fn_exit; } -void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list) +void HYD_pmcd_free_pmi_kvs_list(struct HYD_kvs *kvs_list) { - struct HYD_pmcd_pmi_kvs_pair *key_pair, *tmp; + struct HYD_kvs_pair *key_pair, *tmp; HYDU_FUNC_ENTER(); @@ -119,16 +121,33 @@ void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list) HYDU_FUNC_EXIT(); } -HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs *kvs, - int *ret) +HYD_status HYD_kvs_find(struct HYD_kvs *kvs_list, const char *key, const char **val, int *found) { - struct HYD_pmcd_pmi_kvs_pair *key_pair; + HYDU_FUNC_ENTER(); + *found = 0; + + struct HYD_kvs_pair *run; + for (run = kvs_list->key_pair; run; run = run->next) { + if (!strcmp(run->key, key)) { + *val = run->val; + *found = 1; + goto fn_exit; + } + } + + fn_exit: + HYDU_FUNC_EXIT(); + return HYD_SUCCESS; +} + +HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_kvs * kvs, int *ret) +{ + struct HYD_kvs_pair *key_pair; HYD_status status = HYD_SUCCESS; HYDU_FUNC_ENTER(); - HYDU_MALLOC_OR_JUMP(key_pair, struct HYD_pmcd_pmi_kvs_pair *, - sizeof(struct HYD_pmcd_pmi_kvs_pair), status); + HYDU_MALLOC_OR_JUMP(key_pair, struct HYD_kvs_pair *, sizeof(struct HYD_kvs_pair), status); snprintf(key_pair->key, PMI_MAXKEYLEN, "%s", key); snprintf(key_pair->val, PMI_MAXVALLEN, "%s", val); key_pair->next = NULL; @@ -137,25 +156,18 @@ HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmc if (kvs->key_pair == NULL) { kvs->key_pair = key_pair; - kvs->tail = key_pair; } else { #ifdef PMI_KEY_CHECK - struct HYD_pmcd_pmi_kvs_pair *run, *last; - - for (run = kvs->key_pair; run; run = run->next) { - if (!strcmp(run->key, key_pair->key)) { - /* duplicate key found */ - *ret = -1; - goto fn_fail; - } - last = run; + const char *dummy_val; + int found; + HYD_kvs_find(kvs, key, &val, &found); + if (found) { + *ret = -1; + goto fn_fail; } - /* Add key_pair to end of list. */ - last->next = key_pair; -#else - kvs->tail->next = key_pair; - kvs->tail = key_pair; #endif + key_pair->next = kvs->key_pair; + kvs->key_pair = key_pair; } fn_exit: @@ -167,6 +179,35 @@ HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmc goto fn_exit; } +void HYD_kvs_iter_begin(struct HYD_kvs *kvs_list, bool new_only) +{ + kvs_list->iter_begin = kvs_list->key_pair; + kvs_list->iter_cur = kvs_list->iter_begin; + kvs_list->iter_new_only = new_only; +} + +void HYD_kvs_iter_end(struct HYD_kvs *kvs_list) +{ + if (kvs_list->iter_new_only) { + kvs_list->iter_end = kvs_list->iter_begin; + } + kvs_list->iter_begin = NULL; + kvs_list->iter_new_only = false; +} + +bool HYD_kvs_iter_next(struct HYD_kvs *kvs_list, const char **key, const char **val) +{ + if (kvs_list->iter_cur == NULL || + (kvs_list->iter_new_only && kvs_list->iter_cur == kvs_list->iter_end)) { + return false; + } else { + *key = kvs_list->iter_cur->key; + *val = kvs_list->iter_cur->val; + kvs_list->iter_cur = kvs_list->iter_cur->next; + return true; + } +} + const char *HYD_pmcd_cmd_name(int cmd) { switch (cmd) { diff --git a/src/pm/hydra/lib/pmiserv_common.h b/src/pm/hydra/lib/pmiserv_common.h index b5dc1b01ac5..4fab9a5267b 100644 --- a/src/pm/hydra/lib/pmiserv_common.h +++ b/src/pm/hydra/lib/pmiserv_common.h @@ -16,15 +16,19 @@ #define PMI_MAXVALLEN (1024) /* max length of value in keyval space */ #define PMI_MAXKVSLEN (256) /* max length of various names */ -struct HYD_pmcd_pmi_kvs_pair { +struct HYD_kvs_pair { char key[PMI_MAXKEYLEN]; char val[PMI_MAXVALLEN]; - struct HYD_pmcd_pmi_kvs_pair *next; + struct HYD_kvs_pair *next; }; -struct HYD_pmcd_pmi_kvs { - struct HYD_pmcd_pmi_kvs_pair *key_pair; - struct HYD_pmcd_pmi_kvs_pair *tail; +struct HYD_kvs { + struct HYD_kvs_pair *key_pair; + /* iter fields used for HYD_pmiserv_bcast_keyvals */ + struct HYD_kvs_pair *iter_end; + struct HYD_kvs_pair *iter_begin; + struct HYD_kvs_pair *iter_cur; + bool iter_new_only; }; /* init header proxy send to server upon connection */ @@ -34,10 +38,13 @@ struct HYD_pmcd_init_hdr { int proxy_id; }; -HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs); -void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list); -HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs *kvs, - int *ret); +HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_kvs **kvs); +void HYD_pmcd_free_pmi_kvs_list(struct HYD_kvs *kvs_list); +HYD_status HYD_kvs_find(struct HYD_kvs *kvs_list, const char *key, const char **val, int *found); +HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_kvs *kvs, int *ret); +void HYD_kvs_iter_begin(struct HYD_kvs *kvs_list, bool new_only); +void HYD_kvs_iter_end(struct HYD_kvs *kvs_list); +bool HYD_kvs_iter_next(struct HYD_kvs *kvs_list, const char **key, const char **val); /* ---- struct HYD_pmcd_hdr ---- */ diff --git a/src/pm/hydra/mpiexec/pmiserv_kvs.c b/src/pm/hydra/mpiexec/pmiserv_kvs.c index ac98204623f..1cebeb8a19b 100644 --- a/src/pm/hydra/mpiexec/pmiserv_kvs.c +++ b/src/pm/hydra/mpiexec/pmiserv_kvs.c @@ -56,14 +56,7 @@ HYD_status HYD_pmiserv_kvs_get(struct HYD_proxy *proxy, int process_fd, int pgid found = 1; val = pg_scratch->dead_processes; } else { - struct HYD_pmcd_pmi_kvs_pair *run; - for (run = pg_scratch->kvs->key_pair; run; run = run->next) { - if (strcmp(run->key, key) == 0) { - found = 1; - val = run->val; - break; - } - } + HYD_kvs_find(pg_scratch->kvs, key, &val, &found); } if (!found && sync) { diff --git a/src/pm/hydra/mpiexec/pmiserv_pmi.c b/src/pm/hydra/mpiexec/pmiserv_pmi.c index 37d4b966942..8782591908e 100644 --- a/src/pm/hydra/mpiexec/pmiserv_pmi.c +++ b/src/pm/hydra/mpiexec/pmiserv_pmi.c @@ -30,54 +30,45 @@ HYD_status HYD_pmiserv_pmi_reply(struct HYD_proxy * proxy, int process_fd, struc HYD_status HYD_pmiserv_bcast_keyvals(struct HYD_proxy * proxy, int process_fd) { - int keyval_count, arg_count, j; - struct HYD_pmcd_pmi_kvs_pair *run; - struct HYD_pg *pg; - struct HYD_pmcd_pmi_pg_scratch *pg_scratch; HYD_status status = HYD_SUCCESS; - HYDU_FUNC_ENTER(); + struct HYD_pg *pg; + struct HYD_pmcd_pmi_pg_scratch *pg_scratch; pg = PMISERV_pg_by_id(proxy->pgid); pg_scratch = (struct HYD_pmcd_pmi_pg_scratch *) pg->pg_scratch; - /* find the number of keyvals */ - keyval_count = 0; - for (run = pg_scratch->kvs->key_pair; run; run = run->next) - keyval_count++; - - keyval_count -= pg_scratch->keyval_dist_count; - - if (keyval_count) { - struct PMIU_cmd pmi; - PMIU_msg_set_query(&pmi, PMIU_WIRE_V1, PMIU_CMD_KVSCACHE, false /* not static */); - arg_count = 1; - for (run = pg_scratch->kvs->key_pair, j = 0; run; run = run->next, j++) { - if (j < pg_scratch->keyval_dist_count) - continue; - - PMIU_cmd_add_str(&pmi, run->key, run->val); - - arg_count++; - if (arg_count >= MAX_PMI_ARGS) { - pg_scratch->keyval_dist_count += (arg_count - 1); - for (int i = 0; i < pg->proxy_count; i++) { - status = HYD_pmiserv_pmi_reply(&pg->proxy_list[i], process_fd, &pmi); - HYDU_ERR_POP(status, "error writing PMI line\n"); - } + struct PMIU_cmd pmi; + int arg_count = 0, num_total = 0; + const char *key, *val; + HYD_kvs_iter_begin(pg_scratch->kvs, true); + while (true) { + bool has_next = HYD_kvs_iter_next(pg_scratch->kvs, &key, &val); + if (has_next) { + if (arg_count == 0) { PMIU_msg_set_query(&pmi, PMIU_WIRE_V1, PMIU_CMD_KVSCACHE, false /* not static */); arg_count = 1; } + PMIU_cmd_add_str(&pmi, key, val); + arg_count++; + num_total++; } - if (arg_count > 1) { - pg_scratch->keyval_dist_count += (arg_count - 1); + if (arg_count >= MAX_PMI_ARGS || (!has_next && arg_count > 1)) { for (int i = 0; i < pg->proxy_count; i++) { status = HYD_pmiserv_pmi_reply(&pg->proxy_list[i], process_fd, &pmi); HYDU_ERR_POP(status, "error writing PMI line\n"); } + arg_count = 0; } + + if (!has_next) { + break; + } + } + HYD_kvs_iter_end(pg_scratch->kvs); + if (num_total > 0) { PMIU_cmd_free_buf(&pmi); } diff --git a/src/pm/hydra/mpiexec/pmiserv_pmi.h b/src/pm/hydra/mpiexec/pmiserv_pmi.h index 2f356840233..83c3163c193 100644 --- a/src/pm/hydra/mpiexec/pmiserv_pmi.h +++ b/src/pm/hydra/mpiexec/pmiserv_pmi.h @@ -27,7 +27,7 @@ struct HYD_pmcd_pmi_pg_scratch { int dead_process_count; char kvsname[PMI_MAXKVSLEN]; - struct HYD_pmcd_pmi_kvs *kvs; + struct HYD_kvs *kvs; int keyval_dist_count; /* Number of keyvals distributed */ }; diff --git a/src/pm/hydra/mpiexec/pmiserv_spawn.c b/src/pm/hydra/mpiexec/pmiserv_spawn.c index ebdbbdd0f91..c512fd1af1b 100644 --- a/src/pm/hydra/mpiexec/pmiserv_spawn.c +++ b/src/pm/hydra/mpiexec/pmiserv_spawn.c @@ -17,8 +17,7 @@ static HYD_status allocate_spawn_pg(int spawner_pgid); static HYD_status fill_exec_params(struct HYD_exec *exec, const char *execname, int nprocs, int argcnt, const char **argv, int infonum, struct PMIU_token *infos); -static HYD_status fill_preput_kvs(struct HYD_pmcd_pmi_kvs *kvs, - int preput_num, struct PMIU_token *infos); +static HYD_status fill_preput_kvs(struct HYD_kvs *kvs, int preput_num, struct PMIU_token *infos); static HYD_status do_spawn(void); static char *get_exec_path(const char *execname, const char *path); @@ -205,8 +204,7 @@ static HYD_status fill_exec_params(struct HYD_exec *exec, const char *execname, goto fn_exit; } -static HYD_status fill_preput_kvs(struct HYD_pmcd_pmi_kvs *kvs, - int preput_num, struct PMIU_token *infos) +static HYD_status fill_preput_kvs(struct HYD_kvs *kvs, int preput_num, struct PMIU_token *infos) { HYD_status status = HYD_SUCCESS; diff --git a/src/pm/hydra/proxy/pmip.h b/src/pm/hydra/proxy/pmip.h index 7d53b7f7b3f..c9b3f990225 100644 --- a/src/pm/hydra/proxy/pmip.h +++ b/src/pm/hydra/proxy/pmip.h @@ -101,7 +101,7 @@ struct pmip_pg { struct HYD_exec *exec_list; /* This is for PMI-2 info-putnodeattr. Should it be per-node or per pg? */ - struct HYD_pmcd_pmi_kvs *kvs; + struct HYD_kvs *kvs; /* PMI-1 caches server kvs locally */ struct cache_put_elem cache_put; diff --git a/src/pm/hydra/proxy/pmip_pmi.c b/src/pm/hydra/proxy/pmip_pmi.c index cc4ffbe3095..b3b90df66c2 100644 --- a/src/pm/hydra/proxy/pmip_pmi.c +++ b/src/pm/hydra/proxy/pmip_pmi.c @@ -513,7 +513,8 @@ HYD_status fn_keyval_cache(struct pmip_pg *pg, struct PMIU_cmd *pmi) int i; for (i = 0; i < pg->num_elems; i++) { struct cache_elem *elem = pg->cache_get + i; - HASH_ADD_STR(pg->hash_get, key, elem, MPL_MEM_PM); + struct cache_elem *replaced; + HASH_REPLACE_STR(pg->hash_get, key, elem, replaced, MPL_MEM_PM); } for (; i < pg->num_elems + num_tokens; i++) { struct cache_elem *elem = pg->cache_get + i; @@ -521,7 +522,8 @@ HYD_status fn_keyval_cache(struct pmip_pg *pg, struct PMIU_cmd *pmi) HYDU_ERR_CHKANDJUMP(status, NULL == elem->key, HYD_INTERNAL_ERROR, "%s", ""); elem->val = MPL_strdup(tokens[i - pg->num_elems].val); HYDU_ERR_CHKANDJUMP(status, NULL == elem->val, HYD_INTERNAL_ERROR, "%s", ""); - HASH_ADD_STR(pg->hash_get, key, elem, MPL_MEM_PM); + struct cache_elem *replaced; + HASH_REPLACE_STR(pg->hash_get, key, elem, replaced, MPL_MEM_PM); } pg->num_elems += num_tokens; @@ -672,17 +674,9 @@ HYD_status fn_info_getnodeattr(struct pmip_downstream *p, struct PMIU_cmd *pmi) /* if a predefined value is not found, we let the code fall back * to regular search and return an error to the client */ + const char *val; int found; - found = 0; - - /* FIXME: wrap it in e.g. HYD_pmcd_pmi_find_kvs */ - struct HYD_pmcd_pmi_kvs_pair *run; - for (run = PMIP_pg_from_downstream(p)->kvs->key_pair; run; run = run->next) { - if (!strcmp(run->key, key)) { - found = 1; - break; - } - } + HYD_kvs_find(PMIP_pg_from_downstream(p)->kvs, key, &val, &found); if (!found && wait) { status = HYD_pmcd_pmi_v2_queue_req(p, pmi, key); @@ -692,8 +686,7 @@ HYD_status fn_info_getnodeattr(struct pmip_downstream *p, struct PMIU_cmd *pmi) struct PMIU_cmd pmi_response; if (found) { - pmi_errno = - PMIU_msg_set_response_getnodeattr(pmi, &pmi_response, is_static, run->val, true); + pmi_errno = PMIU_msg_set_response_getnodeattr(pmi, &pmi_response, is_static, val, true); } else { pmi_errno = PMIU_msg_set_response_fail(pmi, &pmi_response, is_static, 1, "not_found"); } diff --git a/src/util/mpir_pmi.c b/src/util/mpir_pmi.c index 1d37e78ed0f..7753288d7ac 100644 --- a/src/util/mpir_pmi.c +++ b/src/util/mpir_pmi.c @@ -105,6 +105,7 @@ static char *pmi_kvs_name; #elif defined USE_PMI2_API static char *pmi_jobid; #elif defined USE_PMIX_API +static int pmix_init_count = 0; static pmix_proc_t pmix_proc; static pmix_proc_t pmix_wcproc; #endif @@ -119,6 +120,7 @@ static void MPIR_pmi_finalize_on_exit(void) PMI2_Finalize(); #elif defined USE_PMIX_API PMIx_Finalize(NULL, 0); + pmix_init_count = 0; #endif } @@ -189,20 +191,26 @@ int MPIR_pmi_init(void) pmix_value_t *pvalue = NULL; - pmi_errno = PMIx_Init(&pmix_proc, NULL, 0); - if (pmi_errno == PMIX_ERR_UNREACH) { - /* no pmi server, assume we are a singleton */ - rank = 0; - size = 1; - goto singleton_out; + /* Since we only call PMIx_Finalize once at `atexit` handler, we need prevent + * calling PMIx_Init multiple times. */ + pmix_init_count++; + if (pmix_init_count == 1) { + pmi_errno = PMIx_Init(&pmix_proc, NULL, 0); + if (pmi_errno == PMIX_ERR_UNREACH) { + /* no pmi server, assume we are a singleton */ + rank = 0; + size = 1; + goto singleton_out; + } + MPIR_ERR_CHKANDJUMP1(pmi_errno != PMIX_SUCCESS, mpi_errno, MPI_ERR_OTHER, + "**pmix_init", "**pmix_init %d", pmi_errno); + + PMIX_PROC_CONSTRUCT(&pmix_wcproc); + MPL_strncpy(pmix_wcproc.nspace, pmix_proc.nspace, PMIX_MAX_NSLEN); + pmix_wcproc.rank = PMIX_RANK_WILDCARD; } - MPIR_ERR_CHKANDJUMP1(pmi_errno != PMIX_SUCCESS, mpi_errno, MPI_ERR_OTHER, - "**pmix_init", "**pmix_init %d", pmi_errno); rank = pmix_proc.rank; - PMIX_PROC_CONSTRUCT(&pmix_wcproc); - MPL_strncpy(pmix_wcproc.nspace, pmix_proc.nspace, PMIX_MAX_NSLEN); - pmix_wcproc.rank = PMIX_RANK_WILDCARD; pmi_errno = PMIx_Get(&pmix_wcproc, PMIX_JOB_SIZE, NULL, 0, &pvalue); MPIR_ERR_CHKANDJUMP1(pmi_errno != PMIX_SUCCESS, mpi_errno, MPI_ERR_OTHER,