From dab136fe0c8215acf9e10a3760bd5df298f1b156 Mon Sep 17 00:00:00 2001 From: Jayyee-HPC Date: Tue, 15 Jun 2021 10:12:30 -0500 Subject: [PATCH 1/4] hydra: using HASH_REPLACE_STR instead of HASH_ADD_STR --- src/pm/hydra/proxy/pmip_pmi.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/pm/hydra/proxy/pmip_pmi.c b/src/pm/hydra/proxy/pmip_pmi.c index cc4ffbe3095..dee2b3528c0 100644 --- a/src/pm/hydra/proxy/pmip_pmi.c +++ b/src/pm/hydra/proxy/pmip_pmi.c @@ -513,7 +513,8 @@ HYD_status fn_keyval_cache(struct pmip_pg *pg, struct PMIU_cmd *pmi) int i; for (i = 0; i < pg->num_elems; i++) { struct cache_elem *elem = pg->cache_get + i; - HASH_ADD_STR(pg->hash_get, key, elem, MPL_MEM_PM); + struct cache_elem *replaced; + HASH_REPLACE_STR(pg->hash_get, key, elem, replaced, MPL_MEM_PM); } for (; i < pg->num_elems + num_tokens; i++) { struct cache_elem *elem = pg->cache_get + i; @@ -521,7 +522,8 @@ HYD_status fn_keyval_cache(struct pmip_pg *pg, struct PMIU_cmd *pmi) HYDU_ERR_CHKANDJUMP(status, NULL == elem->key, HYD_INTERNAL_ERROR, "%s", ""); elem->val = MPL_strdup(tokens[i - pg->num_elems].val); HYDU_ERR_CHKANDJUMP(status, NULL == elem->val, HYD_INTERNAL_ERROR, "%s", ""); - HASH_ADD_STR(pg->hash_get, key, elem, MPL_MEM_PM); + struct cache_elem *replaced; + HASH_REPLACE_STR(pg->hash_get, key, elem, replaced, MPL_MEM_PM); } pg->num_elems += num_tokens; From f48997c81207187ab6d5ae53898e5c125c576023 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 20 Jun 2023 12:23:32 -0500 Subject: [PATCH 2/4] hydra: refactor kvs and fix duplicate keys Refactor kvs to hide internals in lib/pmiserv_common.c. Always add new key pair to the head, so that we always find the latest key pair in the case of duplicate keys. Although PMI require users to not put duplicate keys, this fix makes the behavior more expected. Note: this fixes the session_re_init test. Session reinit skips PMI_Finalize between sessions. The proper fix should introduce new API e.g. PMI_KVS_Clear. For now, we'll simply make the later key overwrite the previous duplicates. --- src/pm/hydra/lib/pmiserv_common.c | 79 +++++++++++++++++++++++------- src/pm/hydra/lib/pmiserv_common.h | 12 ++++- src/pm/hydra/mpiexec/pmiserv_kvs.c | 9 +--- src/pm/hydra/mpiexec/pmiserv_pmi.c | 53 +++++++++----------- src/pm/hydra/proxy/pmip_pmi.c | 15 ++---- 5 files changed, 99 insertions(+), 69 deletions(-) diff --git a/src/pm/hydra/lib/pmiserv_common.c b/src/pm/hydra/lib/pmiserv_common.c index 9db6e0864e7..d069becd9d5 100644 --- a/src/pm/hydra/lib/pmiserv_common.c +++ b/src/pm/hydra/lib/pmiserv_common.c @@ -92,7 +92,9 @@ HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs) HYDU_MALLOC_OR_JUMP(*kvs, struct HYD_pmcd_pmi_kvs *, sizeof(struct HYD_pmcd_pmi_kvs), status); (*kvs)->key_pair = NULL; - (*kvs)->tail = NULL; + (*kvs)->iter_end = NULL; + (*kvs)->iter_cur = NULL; + (*kvs)->iter_new_only = false; fn_exit: HYDU_FUNC_EXIT(); @@ -119,7 +121,27 @@ void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list) HYDU_FUNC_EXIT(); } -HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs *kvs, +HYD_status HYD_pmcd_pmi_kvs_find(struct HYD_pmcd_pmi_kvs *kvs_list, + const char *key, const char **val, int *found) +{ + HYDU_FUNC_ENTER(); + *found = 0; + + struct HYD_pmcd_pmi_kvs_pair *run; + for (run = kvs_list->key_pair; run; run = run->next) { + if (!strcmp(run->key, key)) { + *val = run->val; + *found = 1; + goto fn_exit; + } + } + + fn_exit: + HYDU_FUNC_EXIT(); + return HYD_SUCCESS; +} + +HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs * kvs, int *ret) { struct HYD_pmcd_pmi_kvs_pair *key_pair; @@ -137,25 +159,18 @@ HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmc if (kvs->key_pair == NULL) { kvs->key_pair = key_pair; - kvs->tail = key_pair; } else { #ifdef PMI_KEY_CHECK - struct HYD_pmcd_pmi_kvs_pair *run, *last; - - for (run = kvs->key_pair; run; run = run->next) { - if (!strcmp(run->key, key_pair->key)) { - /* duplicate key found */ - *ret = -1; - goto fn_fail; - } - last = run; + const char *dummy_val; + int found; + HYD_pmcd_pmi_kvs_find(kvs, key, &val, &found); + if (found) { + *ret = -1; + goto fn_fail; } - /* Add key_pair to end of list. */ - last->next = key_pair; -#else - kvs->tail->next = key_pair; - kvs->tail = key_pair; #endif + key_pair->next = kvs->key_pair; + kvs->key_pair = key_pair; } fn_exit: @@ -167,6 +182,36 @@ HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmc goto fn_exit; } +void HYD_pmcd_pmi_kvs_iter_begin(struct HYD_pmcd_pmi_kvs *kvs_list, bool new_only) +{ + kvs_list->iter_begin = kvs_list->key_pair; + kvs_list->iter_cur = kvs_list->iter_begin; + kvs_list->iter_new_only = new_only; +} + +void HYD_pmcd_pmi_kvs_iter_end(struct HYD_pmcd_pmi_kvs *kvs_list) +{ + if (kvs_list->iter_new_only) { + kvs_list->iter_end = kvs_list->iter_begin; + } + kvs_list->iter_begin = NULL; + kvs_list->iter_new_only = false; +} + +bool HYD_pmcd_pmi_kvs_iter_next(struct HYD_pmcd_pmi_kvs *kvs_list, + const char **key, const char **val) +{ + if (kvs_list->iter_cur == NULL || + (kvs_list->iter_new_only && kvs_list->iter_cur == kvs_list->iter_end)) { + return false; + } else { + *key = kvs_list->iter_cur->key; + *val = kvs_list->iter_cur->val; + kvs_list->iter_cur = kvs_list->iter_cur->next; + return true; + } +} + const char *HYD_pmcd_cmd_name(int cmd) { switch (cmd) { diff --git a/src/pm/hydra/lib/pmiserv_common.h b/src/pm/hydra/lib/pmiserv_common.h index b5dc1b01ac5..7473ac7598b 100644 --- a/src/pm/hydra/lib/pmiserv_common.h +++ b/src/pm/hydra/lib/pmiserv_common.h @@ -24,7 +24,11 @@ struct HYD_pmcd_pmi_kvs_pair { struct HYD_pmcd_pmi_kvs { struct HYD_pmcd_pmi_kvs_pair *key_pair; - struct HYD_pmcd_pmi_kvs_pair *tail; + /* iter fields used for HYD_pmiserv_bcast_keyvals */ + struct HYD_pmcd_pmi_kvs_pair *iter_end; + struct HYD_pmcd_pmi_kvs_pair *iter_begin; + struct HYD_pmcd_pmi_kvs_pair *iter_cur; + bool iter_new_only; }; /* init header proxy send to server upon connection */ @@ -36,8 +40,14 @@ struct HYD_pmcd_init_hdr { HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs); void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list); +HYD_status HYD_pmcd_pmi_kvs_find(struct HYD_pmcd_pmi_kvs *kvs_list, + const char *key, const char **val, int *found); HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs *kvs, int *ret); +void HYD_pmcd_pmi_kvs_iter_begin(struct HYD_pmcd_pmi_kvs *kvs_list, bool new_only); +void HYD_pmcd_pmi_kvs_iter_end(struct HYD_pmcd_pmi_kvs *kvs_list); +bool HYD_pmcd_pmi_kvs_iter_next(struct HYD_pmcd_pmi_kvs *kvs_list, + const char **key, const char **val); /* ---- struct HYD_pmcd_hdr ---- */ diff --git a/src/pm/hydra/mpiexec/pmiserv_kvs.c b/src/pm/hydra/mpiexec/pmiserv_kvs.c index ac98204623f..b7636e3ce9d 100644 --- a/src/pm/hydra/mpiexec/pmiserv_kvs.c +++ b/src/pm/hydra/mpiexec/pmiserv_kvs.c @@ -56,14 +56,7 @@ HYD_status HYD_pmiserv_kvs_get(struct HYD_proxy *proxy, int process_fd, int pgid found = 1; val = pg_scratch->dead_processes; } else { - struct HYD_pmcd_pmi_kvs_pair *run; - for (run = pg_scratch->kvs->key_pair; run; run = run->next) { - if (strcmp(run->key, key) == 0) { - found = 1; - val = run->val; - break; - } - } + HYD_pmcd_pmi_kvs_find(pg_scratch->kvs, key, &val, &found); } if (!found && sync) { diff --git a/src/pm/hydra/mpiexec/pmiserv_pmi.c b/src/pm/hydra/mpiexec/pmiserv_pmi.c index 37d4b966942..ea020884733 100644 --- a/src/pm/hydra/mpiexec/pmiserv_pmi.c +++ b/src/pm/hydra/mpiexec/pmiserv_pmi.c @@ -30,54 +30,45 @@ HYD_status HYD_pmiserv_pmi_reply(struct HYD_proxy * proxy, int process_fd, struc HYD_status HYD_pmiserv_bcast_keyvals(struct HYD_proxy * proxy, int process_fd) { - int keyval_count, arg_count, j; - struct HYD_pmcd_pmi_kvs_pair *run; - struct HYD_pg *pg; - struct HYD_pmcd_pmi_pg_scratch *pg_scratch; HYD_status status = HYD_SUCCESS; - HYDU_FUNC_ENTER(); + struct HYD_pg *pg; + struct HYD_pmcd_pmi_pg_scratch *pg_scratch; pg = PMISERV_pg_by_id(proxy->pgid); pg_scratch = (struct HYD_pmcd_pmi_pg_scratch *) pg->pg_scratch; - /* find the number of keyvals */ - keyval_count = 0; - for (run = pg_scratch->kvs->key_pair; run; run = run->next) - keyval_count++; - - keyval_count -= pg_scratch->keyval_dist_count; - - if (keyval_count) { - struct PMIU_cmd pmi; - PMIU_msg_set_query(&pmi, PMIU_WIRE_V1, PMIU_CMD_KVSCACHE, false /* not static */); - arg_count = 1; - for (run = pg_scratch->kvs->key_pair, j = 0; run; run = run->next, j++) { - if (j < pg_scratch->keyval_dist_count) - continue; - - PMIU_cmd_add_str(&pmi, run->key, run->val); - - arg_count++; - if (arg_count >= MAX_PMI_ARGS) { - pg_scratch->keyval_dist_count += (arg_count - 1); - for (int i = 0; i < pg->proxy_count; i++) { - status = HYD_pmiserv_pmi_reply(&pg->proxy_list[i], process_fd, &pmi); - HYDU_ERR_POP(status, "error writing PMI line\n"); - } + struct PMIU_cmd pmi; + int arg_count = 0, num_total = 0; + const char *key, *val; + HYD_pmcd_pmi_kvs_iter_begin(pg_scratch->kvs, true); + while (true) { + bool has_next = HYD_pmcd_pmi_kvs_iter_next(pg_scratch->kvs, &key, &val); + if (has_next) { + if (arg_count == 0) { PMIU_msg_set_query(&pmi, PMIU_WIRE_V1, PMIU_CMD_KVSCACHE, false /* not static */); arg_count = 1; } + PMIU_cmd_add_str(&pmi, key, val); + arg_count++; + num_total++; } - if (arg_count > 1) { - pg_scratch->keyval_dist_count += (arg_count - 1); + if (arg_count >= MAX_PMI_ARGS || (!has_next && arg_count > 1)) { for (int i = 0; i < pg->proxy_count; i++) { status = HYD_pmiserv_pmi_reply(&pg->proxy_list[i], process_fd, &pmi); HYDU_ERR_POP(status, "error writing PMI line\n"); } + arg_count = 0; } + + if (!has_next) { + break; + } + } + HYD_pmcd_pmi_kvs_iter_end(pg_scratch->kvs); + if (num_total > 0) { PMIU_cmd_free_buf(&pmi); } diff --git a/src/pm/hydra/proxy/pmip_pmi.c b/src/pm/hydra/proxy/pmip_pmi.c index dee2b3528c0..52bdd7d1569 100644 --- a/src/pm/hydra/proxy/pmip_pmi.c +++ b/src/pm/hydra/proxy/pmip_pmi.c @@ -674,17 +674,9 @@ HYD_status fn_info_getnodeattr(struct pmip_downstream *p, struct PMIU_cmd *pmi) /* if a predefined value is not found, we let the code fall back * to regular search and return an error to the client */ + const char *val; int found; - found = 0; - - /* FIXME: wrap it in e.g. HYD_pmcd_pmi_find_kvs */ - struct HYD_pmcd_pmi_kvs_pair *run; - for (run = PMIP_pg_from_downstream(p)->kvs->key_pair; run; run = run->next) { - if (!strcmp(run->key, key)) { - found = 1; - break; - } - } + HYD_pmcd_pmi_kvs_find(PMIP_pg_from_downstream(p)->kvs, key, &val, &found); if (!found && wait) { status = HYD_pmcd_pmi_v2_queue_req(p, pmi, key); @@ -694,8 +686,7 @@ HYD_status fn_info_getnodeattr(struct pmip_downstream *p, struct PMIU_cmd *pmi) struct PMIU_cmd pmi_response; if (found) { - pmi_errno = - PMIU_msg_set_response_getnodeattr(pmi, &pmi_response, is_static, run->val, true); + pmi_errno = PMIU_msg_set_response_getnodeattr(pmi, &pmi_response, is_static, val, true); } else { pmi_errno = PMIU_msg_set_response_fail(pmi, &pmi_response, is_static, 1, "not_found"); } From ceca98016b7682a07e62660249c97bb9ceb1a373 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Tue, 20 Jun 2023 12:41:47 -0500 Subject: [PATCH 3/4] hydra: rename the HYD_pmcd_pmi_kvs_ prefix Shorten it to HYD_kvs_. --- src/pm/hydra/lib/pmiserv_common.c | 30 ++++++++++++--------------- src/pm/hydra/lib/pmiserv_common.h | 31 +++++++++++++--------------- src/pm/hydra/mpiexec/pmiserv_kvs.c | 2 +- src/pm/hydra/mpiexec/pmiserv_pmi.c | 6 +++--- src/pm/hydra/mpiexec/pmiserv_pmi.h | 2 +- src/pm/hydra/mpiexec/pmiserv_spawn.c | 6 ++---- src/pm/hydra/proxy/pmip.h | 2 +- src/pm/hydra/proxy/pmip_pmi.c | 2 +- 8 files changed, 36 insertions(+), 45 deletions(-) diff --git a/src/pm/hydra/lib/pmiserv_common.c b/src/pm/hydra/lib/pmiserv_common.c index d069becd9d5..eae6ffe7c5a 100644 --- a/src/pm/hydra/lib/pmiserv_common.c +++ b/src/pm/hydra/lib/pmiserv_common.c @@ -85,12 +85,12 @@ HYD_status HYD_pmcd_pmi_send(int fd, struct PMIU_cmd *pmi, struct HYD_pmcd_hdr * goto fn_exit; } -HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs) +HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_kvs **kvs) { HYD_status status = HYD_SUCCESS; HYDU_FUNC_ENTER(); - HYDU_MALLOC_OR_JUMP(*kvs, struct HYD_pmcd_pmi_kvs *, sizeof(struct HYD_pmcd_pmi_kvs), status); + HYDU_MALLOC_OR_JUMP(*kvs, struct HYD_kvs *, sizeof(struct HYD_kvs), status); (*kvs)->key_pair = NULL; (*kvs)->iter_end = NULL; (*kvs)->iter_cur = NULL; @@ -104,9 +104,9 @@ HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs) goto fn_exit; } -void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list) +void HYD_pmcd_free_pmi_kvs_list(struct HYD_kvs *kvs_list) { - struct HYD_pmcd_pmi_kvs_pair *key_pair, *tmp; + struct HYD_kvs_pair *key_pair, *tmp; HYDU_FUNC_ENTER(); @@ -121,13 +121,12 @@ void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list) HYDU_FUNC_EXIT(); } -HYD_status HYD_pmcd_pmi_kvs_find(struct HYD_pmcd_pmi_kvs *kvs_list, - const char *key, const char **val, int *found) +HYD_status HYD_kvs_find(struct HYD_kvs *kvs_list, const char *key, const char **val, int *found) { HYDU_FUNC_ENTER(); *found = 0; - struct HYD_pmcd_pmi_kvs_pair *run; + struct HYD_kvs_pair *run; for (run = kvs_list->key_pair; run; run = run->next) { if (!strcmp(run->key, key)) { *val = run->val; @@ -141,16 +140,14 @@ HYD_status HYD_pmcd_pmi_kvs_find(struct HYD_pmcd_pmi_kvs *kvs_list, return HYD_SUCCESS; } -HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs * kvs, - int *ret) +HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_kvs * kvs, int *ret) { - struct HYD_pmcd_pmi_kvs_pair *key_pair; + struct HYD_kvs_pair *key_pair; HYD_status status = HYD_SUCCESS; HYDU_FUNC_ENTER(); - HYDU_MALLOC_OR_JUMP(key_pair, struct HYD_pmcd_pmi_kvs_pair *, - sizeof(struct HYD_pmcd_pmi_kvs_pair), status); + HYDU_MALLOC_OR_JUMP(key_pair, struct HYD_kvs_pair *, sizeof(struct HYD_kvs_pair), status); snprintf(key_pair->key, PMI_MAXKEYLEN, "%s", key); snprintf(key_pair->val, PMI_MAXVALLEN, "%s", val); key_pair->next = NULL; @@ -163,7 +160,7 @@ HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmc #ifdef PMI_KEY_CHECK const char *dummy_val; int found; - HYD_pmcd_pmi_kvs_find(kvs, key, &val, &found); + HYD_kvs_find(kvs, key, &val, &found); if (found) { *ret = -1; goto fn_fail; @@ -182,14 +179,14 @@ HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmc goto fn_exit; } -void HYD_pmcd_pmi_kvs_iter_begin(struct HYD_pmcd_pmi_kvs *kvs_list, bool new_only) +void HYD_kvs_iter_begin(struct HYD_kvs *kvs_list, bool new_only) { kvs_list->iter_begin = kvs_list->key_pair; kvs_list->iter_cur = kvs_list->iter_begin; kvs_list->iter_new_only = new_only; } -void HYD_pmcd_pmi_kvs_iter_end(struct HYD_pmcd_pmi_kvs *kvs_list) +void HYD_kvs_iter_end(struct HYD_kvs *kvs_list) { if (kvs_list->iter_new_only) { kvs_list->iter_end = kvs_list->iter_begin; @@ -198,8 +195,7 @@ void HYD_pmcd_pmi_kvs_iter_end(struct HYD_pmcd_pmi_kvs *kvs_list) kvs_list->iter_new_only = false; } -bool HYD_pmcd_pmi_kvs_iter_next(struct HYD_pmcd_pmi_kvs *kvs_list, - const char **key, const char **val) +bool HYD_kvs_iter_next(struct HYD_kvs *kvs_list, const char **key, const char **val) { if (kvs_list->iter_cur == NULL || (kvs_list->iter_new_only && kvs_list->iter_cur == kvs_list->iter_end)) { diff --git a/src/pm/hydra/lib/pmiserv_common.h b/src/pm/hydra/lib/pmiserv_common.h index 7473ac7598b..4fab9a5267b 100644 --- a/src/pm/hydra/lib/pmiserv_common.h +++ b/src/pm/hydra/lib/pmiserv_common.h @@ -16,18 +16,18 @@ #define PMI_MAXVALLEN (1024) /* max length of value in keyval space */ #define PMI_MAXKVSLEN (256) /* max length of various names */ -struct HYD_pmcd_pmi_kvs_pair { +struct HYD_kvs_pair { char key[PMI_MAXKEYLEN]; char val[PMI_MAXVALLEN]; - struct HYD_pmcd_pmi_kvs_pair *next; + struct HYD_kvs_pair *next; }; -struct HYD_pmcd_pmi_kvs { - struct HYD_pmcd_pmi_kvs_pair *key_pair; +struct HYD_kvs { + struct HYD_kvs_pair *key_pair; /* iter fields used for HYD_pmiserv_bcast_keyvals */ - struct HYD_pmcd_pmi_kvs_pair *iter_end; - struct HYD_pmcd_pmi_kvs_pair *iter_begin; - struct HYD_pmcd_pmi_kvs_pair *iter_cur; + struct HYD_kvs_pair *iter_end; + struct HYD_kvs_pair *iter_begin; + struct HYD_kvs_pair *iter_cur; bool iter_new_only; }; @@ -38,16 +38,13 @@ struct HYD_pmcd_init_hdr { int proxy_id; }; -HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs); -void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list); -HYD_status HYD_pmcd_pmi_kvs_find(struct HYD_pmcd_pmi_kvs *kvs_list, - const char *key, const char **val, int *found); -HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs *kvs, - int *ret); -void HYD_pmcd_pmi_kvs_iter_begin(struct HYD_pmcd_pmi_kvs *kvs_list, bool new_only); -void HYD_pmcd_pmi_kvs_iter_end(struct HYD_pmcd_pmi_kvs *kvs_list); -bool HYD_pmcd_pmi_kvs_iter_next(struct HYD_pmcd_pmi_kvs *kvs_list, - const char **key, const char **val); +HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_kvs **kvs); +void HYD_pmcd_free_pmi_kvs_list(struct HYD_kvs *kvs_list); +HYD_status HYD_kvs_find(struct HYD_kvs *kvs_list, const char *key, const char **val, int *found); +HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_kvs *kvs, int *ret); +void HYD_kvs_iter_begin(struct HYD_kvs *kvs_list, bool new_only); +void HYD_kvs_iter_end(struct HYD_kvs *kvs_list); +bool HYD_kvs_iter_next(struct HYD_kvs *kvs_list, const char **key, const char **val); /* ---- struct HYD_pmcd_hdr ---- */ diff --git a/src/pm/hydra/mpiexec/pmiserv_kvs.c b/src/pm/hydra/mpiexec/pmiserv_kvs.c index b7636e3ce9d..1cebeb8a19b 100644 --- a/src/pm/hydra/mpiexec/pmiserv_kvs.c +++ b/src/pm/hydra/mpiexec/pmiserv_kvs.c @@ -56,7 +56,7 @@ HYD_status HYD_pmiserv_kvs_get(struct HYD_proxy *proxy, int process_fd, int pgid found = 1; val = pg_scratch->dead_processes; } else { - HYD_pmcd_pmi_kvs_find(pg_scratch->kvs, key, &val, &found); + HYD_kvs_find(pg_scratch->kvs, key, &val, &found); } if (!found && sync) { diff --git a/src/pm/hydra/mpiexec/pmiserv_pmi.c b/src/pm/hydra/mpiexec/pmiserv_pmi.c index ea020884733..8782591908e 100644 --- a/src/pm/hydra/mpiexec/pmiserv_pmi.c +++ b/src/pm/hydra/mpiexec/pmiserv_pmi.c @@ -42,9 +42,9 @@ HYD_status HYD_pmiserv_bcast_keyvals(struct HYD_proxy * proxy, int process_fd) int arg_count = 0, num_total = 0; const char *key, *val; - HYD_pmcd_pmi_kvs_iter_begin(pg_scratch->kvs, true); + HYD_kvs_iter_begin(pg_scratch->kvs, true); while (true) { - bool has_next = HYD_pmcd_pmi_kvs_iter_next(pg_scratch->kvs, &key, &val); + bool has_next = HYD_kvs_iter_next(pg_scratch->kvs, &key, &val); if (has_next) { if (arg_count == 0) { PMIU_msg_set_query(&pmi, PMIU_WIRE_V1, PMIU_CMD_KVSCACHE, false /* not static */); @@ -67,7 +67,7 @@ HYD_status HYD_pmiserv_bcast_keyvals(struct HYD_proxy * proxy, int process_fd) break; } } - HYD_pmcd_pmi_kvs_iter_end(pg_scratch->kvs); + HYD_kvs_iter_end(pg_scratch->kvs); if (num_total > 0) { PMIU_cmd_free_buf(&pmi); } diff --git a/src/pm/hydra/mpiexec/pmiserv_pmi.h b/src/pm/hydra/mpiexec/pmiserv_pmi.h index 2f356840233..83c3163c193 100644 --- a/src/pm/hydra/mpiexec/pmiserv_pmi.h +++ b/src/pm/hydra/mpiexec/pmiserv_pmi.h @@ -27,7 +27,7 @@ struct HYD_pmcd_pmi_pg_scratch { int dead_process_count; char kvsname[PMI_MAXKVSLEN]; - struct HYD_pmcd_pmi_kvs *kvs; + struct HYD_kvs *kvs; int keyval_dist_count; /* Number of keyvals distributed */ }; diff --git a/src/pm/hydra/mpiexec/pmiserv_spawn.c b/src/pm/hydra/mpiexec/pmiserv_spawn.c index ebdbbdd0f91..c512fd1af1b 100644 --- a/src/pm/hydra/mpiexec/pmiserv_spawn.c +++ b/src/pm/hydra/mpiexec/pmiserv_spawn.c @@ -17,8 +17,7 @@ static HYD_status allocate_spawn_pg(int spawner_pgid); static HYD_status fill_exec_params(struct HYD_exec *exec, const char *execname, int nprocs, int argcnt, const char **argv, int infonum, struct PMIU_token *infos); -static HYD_status fill_preput_kvs(struct HYD_pmcd_pmi_kvs *kvs, - int preput_num, struct PMIU_token *infos); +static HYD_status fill_preput_kvs(struct HYD_kvs *kvs, int preput_num, struct PMIU_token *infos); static HYD_status do_spawn(void); static char *get_exec_path(const char *execname, const char *path); @@ -205,8 +204,7 @@ static HYD_status fill_exec_params(struct HYD_exec *exec, const char *execname, goto fn_exit; } -static HYD_status fill_preput_kvs(struct HYD_pmcd_pmi_kvs *kvs, - int preput_num, struct PMIU_token *infos) +static HYD_status fill_preput_kvs(struct HYD_kvs *kvs, int preput_num, struct PMIU_token *infos) { HYD_status status = HYD_SUCCESS; diff --git a/src/pm/hydra/proxy/pmip.h b/src/pm/hydra/proxy/pmip.h index 7d53b7f7b3f..c9b3f990225 100644 --- a/src/pm/hydra/proxy/pmip.h +++ b/src/pm/hydra/proxy/pmip.h @@ -101,7 +101,7 @@ struct pmip_pg { struct HYD_exec *exec_list; /* This is for PMI-2 info-putnodeattr. Should it be per-node or per pg? */ - struct HYD_pmcd_pmi_kvs *kvs; + struct HYD_kvs *kvs; /* PMI-1 caches server kvs locally */ struct cache_put_elem cache_put; diff --git a/src/pm/hydra/proxy/pmip_pmi.c b/src/pm/hydra/proxy/pmip_pmi.c index 52bdd7d1569..b3b90df66c2 100644 --- a/src/pm/hydra/proxy/pmip_pmi.c +++ b/src/pm/hydra/proxy/pmip_pmi.c @@ -676,7 +676,7 @@ HYD_status fn_info_getnodeattr(struct pmip_downstream *p, struct PMIU_cmd *pmi) const char *val; int found; - HYD_pmcd_pmi_kvs_find(PMIP_pg_from_downstream(p)->kvs, key, &val, &found); + HYD_kvs_find(PMIP_pg_from_downstream(p)->kvs, key, &val, &found); if (!found && wait) { status = HYD_pmcd_pmi_v2_queue_req(p, pmi, key); From a67bf3ceefb2475383679c2ed53529f80a0e07f0 Mon Sep 17 00:00:00 2001 From: Hui Zhou Date: Wed, 21 Jun 2023 08:07:47 -0500 Subject: [PATCH 4/4] util/pmi: avoid calling PMIx_Init multiple times Since now we only call PMIx_Finalize once in the at exit handler, we need make sure that we only call PMIx_Init once. Otherwise, the openpmix server may complain abnormal exit. --- src/util/mpir_pmi.c | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/util/mpir_pmi.c b/src/util/mpir_pmi.c index 1d37e78ed0f..7753288d7ac 100644 --- a/src/util/mpir_pmi.c +++ b/src/util/mpir_pmi.c @@ -105,6 +105,7 @@ static char *pmi_kvs_name; #elif defined USE_PMI2_API static char *pmi_jobid; #elif defined USE_PMIX_API +static int pmix_init_count = 0; static pmix_proc_t pmix_proc; static pmix_proc_t pmix_wcproc; #endif @@ -119,6 +120,7 @@ static void MPIR_pmi_finalize_on_exit(void) PMI2_Finalize(); #elif defined USE_PMIX_API PMIx_Finalize(NULL, 0); + pmix_init_count = 0; #endif } @@ -189,20 +191,26 @@ int MPIR_pmi_init(void) pmix_value_t *pvalue = NULL; - pmi_errno = PMIx_Init(&pmix_proc, NULL, 0); - if (pmi_errno == PMIX_ERR_UNREACH) { - /* no pmi server, assume we are a singleton */ - rank = 0; - size = 1; - goto singleton_out; + /* Since we only call PMIx_Finalize once at `atexit` handler, we need prevent + * calling PMIx_Init multiple times. */ + pmix_init_count++; + if (pmix_init_count == 1) { + pmi_errno = PMIx_Init(&pmix_proc, NULL, 0); + if (pmi_errno == PMIX_ERR_UNREACH) { + /* no pmi server, assume we are a singleton */ + rank = 0; + size = 1; + goto singleton_out; + } + MPIR_ERR_CHKANDJUMP1(pmi_errno != PMIX_SUCCESS, mpi_errno, MPI_ERR_OTHER, + "**pmix_init", "**pmix_init %d", pmi_errno); + + PMIX_PROC_CONSTRUCT(&pmix_wcproc); + MPL_strncpy(pmix_wcproc.nspace, pmix_proc.nspace, PMIX_MAX_NSLEN); + pmix_wcproc.rank = PMIX_RANK_WILDCARD; } - MPIR_ERR_CHKANDJUMP1(pmi_errno != PMIX_SUCCESS, mpi_errno, MPI_ERR_OTHER, - "**pmix_init", "**pmix_init %d", pmi_errno); rank = pmix_proc.rank; - PMIX_PROC_CONSTRUCT(&pmix_wcproc); - MPL_strncpy(pmix_wcproc.nspace, pmix_proc.nspace, PMIX_MAX_NSLEN); - pmix_wcproc.rank = PMIX_RANK_WILDCARD; pmi_errno = PMIx_Get(&pmix_wcproc, PMIX_JOB_SIZE, NULL, 0, &pvalue); MPIR_ERR_CHKANDJUMP1(pmi_errno != PMIX_SUCCESS, mpi_errno, MPI_ERR_OTHER,