Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hydra: refactor kvs and fix duplicate keys #6564

Merged
merged 4 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 66 additions & 25 deletions src/pm/hydra/lib/pmiserv_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ HYD_status HYD_pmcd_pmi_send(int fd, struct PMIU_cmd *pmi, struct HYD_pmcd_hdr *
goto fn_exit;
}

HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs)
HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_kvs **kvs)
{
HYD_status status = HYD_SUCCESS;
HYDU_FUNC_ENTER();

HYDU_MALLOC_OR_JUMP(*kvs, struct HYD_pmcd_pmi_kvs *, sizeof(struct HYD_pmcd_pmi_kvs), status);
HYDU_MALLOC_OR_JUMP(*kvs, struct HYD_kvs *, sizeof(struct HYD_kvs), status);
(*kvs)->key_pair = NULL;
(*kvs)->tail = NULL;
(*kvs)->iter_end = NULL;
(*kvs)->iter_cur = NULL;
(*kvs)->iter_new_only = false;

fn_exit:
HYDU_FUNC_EXIT();
Expand All @@ -102,9 +104,9 @@ HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs)
goto fn_exit;
}

void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list)
void HYD_pmcd_free_pmi_kvs_list(struct HYD_kvs *kvs_list)
{
struct HYD_pmcd_pmi_kvs_pair *key_pair, *tmp;
struct HYD_kvs_pair *key_pair, *tmp;

HYDU_FUNC_ENTER();

Expand All @@ -119,16 +121,33 @@ void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list)
HYDU_FUNC_EXIT();
}

HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs *kvs,
int *ret)
HYD_status HYD_kvs_find(struct HYD_kvs *kvs_list, const char *key, const char **val, int *found)
{
struct HYD_pmcd_pmi_kvs_pair *key_pair;
HYDU_FUNC_ENTER();
*found = 0;

struct HYD_kvs_pair *run;
for (run = kvs_list->key_pair; run; run = run->next) {
if (!strcmp(run->key, key)) {
*val = run->val;
*found = 1;
goto fn_exit;
}
}

fn_exit:
HYDU_FUNC_EXIT();
return HYD_SUCCESS;
}

HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_kvs * kvs, int *ret)
{
struct HYD_kvs_pair *key_pair;
HYD_status status = HYD_SUCCESS;

HYDU_FUNC_ENTER();

HYDU_MALLOC_OR_JUMP(key_pair, struct HYD_pmcd_pmi_kvs_pair *,
sizeof(struct HYD_pmcd_pmi_kvs_pair), status);
HYDU_MALLOC_OR_JUMP(key_pair, struct HYD_kvs_pair *, sizeof(struct HYD_kvs_pair), status);
snprintf(key_pair->key, PMI_MAXKEYLEN, "%s", key);
snprintf(key_pair->val, PMI_MAXVALLEN, "%s", val);
key_pair->next = NULL;
Expand All @@ -137,25 +156,18 @@ HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmc

if (kvs->key_pair == NULL) {
kvs->key_pair = key_pair;
kvs->tail = key_pair;
} else {
#ifdef PMI_KEY_CHECK
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would make sense to enable duplicate keys also with debugging of MPICH/ hydra enabled, e.g., by changing the PMI_KEY_CHECK into a debug message of hydra for duplicate keys instead of an error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it makes sense. Do you want to make a PR for it?

struct HYD_pmcd_pmi_kvs_pair *run, *last;

for (run = kvs->key_pair; run; run = run->next) {
if (!strcmp(run->key, key_pair->key)) {
/* duplicate key found */
*ret = -1;
goto fn_fail;
}
last = run;
const char *dummy_val;
int found;
HYD_kvs_find(kvs, key, &val, &found);
if (found) {
*ret = -1;
goto fn_fail;
}
/* Add key_pair to end of list. */
last->next = key_pair;
#else
kvs->tail->next = key_pair;
kvs->tail = key_pair;
#endif
key_pair->next = kvs->key_pair;
kvs->key_pair = key_pair;
}

fn_exit:
Expand All @@ -167,6 +179,35 @@ HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmc
goto fn_exit;
}

void HYD_kvs_iter_begin(struct HYD_kvs *kvs_list, bool new_only)
{
kvs_list->iter_begin = kvs_list->key_pair;
kvs_list->iter_cur = kvs_list->iter_begin;
kvs_list->iter_new_only = new_only;
}

void HYD_kvs_iter_end(struct HYD_kvs *kvs_list)
{
if (kvs_list->iter_new_only) {
kvs_list->iter_end = kvs_list->iter_begin;
}
kvs_list->iter_begin = NULL;
kvs_list->iter_new_only = false;
}

bool HYD_kvs_iter_next(struct HYD_kvs *kvs_list, const char **key, const char **val)
{
if (kvs_list->iter_cur == NULL ||
(kvs_list->iter_new_only && kvs_list->iter_cur == kvs_list->iter_end)) {
return false;
} else {
*key = kvs_list->iter_cur->key;
*val = kvs_list->iter_cur->val;
kvs_list->iter_cur = kvs_list->iter_cur->next;
return true;
}
}

const char *HYD_pmcd_cmd_name(int cmd)
{
switch (cmd) {
Expand Down
25 changes: 16 additions & 9 deletions src/pm/hydra/lib/pmiserv_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
#define PMI_MAXVALLEN (1024) /* max length of value in keyval space */
#define PMI_MAXKVSLEN (256) /* max length of various names */

struct HYD_pmcd_pmi_kvs_pair {
struct HYD_kvs_pair {
char key[PMI_MAXKEYLEN];
char val[PMI_MAXVALLEN];
struct HYD_pmcd_pmi_kvs_pair *next;
struct HYD_kvs_pair *next;
};

struct HYD_pmcd_pmi_kvs {
struct HYD_pmcd_pmi_kvs_pair *key_pair;
struct HYD_pmcd_pmi_kvs_pair *tail;
struct HYD_kvs {
struct HYD_kvs_pair *key_pair;
/* iter fields used for HYD_pmiserv_bcast_keyvals */
struct HYD_kvs_pair *iter_end;
struct HYD_kvs_pair *iter_begin;
struct HYD_kvs_pair *iter_cur;
bool iter_new_only;
};

/* init header proxy send to server upon connection */
Expand All @@ -34,10 +38,13 @@ struct HYD_pmcd_init_hdr {
int proxy_id;
};

HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_pmcd_pmi_kvs **kvs);
void HYD_pmcd_free_pmi_kvs_list(struct HYD_pmcd_pmi_kvs *kvs_list);
HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_pmcd_pmi_kvs *kvs,
int *ret);
HYD_status HYD_pmcd_pmi_allocate_kvs(struct HYD_kvs **kvs);
void HYD_pmcd_free_pmi_kvs_list(struct HYD_kvs *kvs_list);
HYD_status HYD_kvs_find(struct HYD_kvs *kvs_list, const char *key, const char **val, int *found);
HYD_status HYD_pmcd_pmi_add_kvs(const char *key, const char *val, struct HYD_kvs *kvs, int *ret);
void HYD_kvs_iter_begin(struct HYD_kvs *kvs_list, bool new_only);
void HYD_kvs_iter_end(struct HYD_kvs *kvs_list);
bool HYD_kvs_iter_next(struct HYD_kvs *kvs_list, const char **key, const char **val);

/* ---- struct HYD_pmcd_hdr ---- */

Expand Down
9 changes: 1 addition & 8 deletions src/pm/hydra/mpiexec/pmiserv_kvs.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,7 @@ HYD_status HYD_pmiserv_kvs_get(struct HYD_proxy *proxy, int process_fd, int pgid
found = 1;
val = pg_scratch->dead_processes;
} else {
struct HYD_pmcd_pmi_kvs_pair *run;
for (run = pg_scratch->kvs->key_pair; run; run = run->next) {
if (strcmp(run->key, key) == 0) {
found = 1;
val = run->val;
break;
}
}
HYD_kvs_find(pg_scratch->kvs, key, &val, &found);
}

if (!found && sync) {
Expand Down
53 changes: 22 additions & 31 deletions src/pm/hydra/mpiexec/pmiserv_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,54 +30,45 @@ HYD_status HYD_pmiserv_pmi_reply(struct HYD_proxy * proxy, int process_fd, struc

HYD_status HYD_pmiserv_bcast_keyvals(struct HYD_proxy * proxy, int process_fd)
{
int keyval_count, arg_count, j;
struct HYD_pmcd_pmi_kvs_pair *run;
struct HYD_pg *pg;
struct HYD_pmcd_pmi_pg_scratch *pg_scratch;
HYD_status status = HYD_SUCCESS;

HYDU_FUNC_ENTER();

struct HYD_pg *pg;
struct HYD_pmcd_pmi_pg_scratch *pg_scratch;
pg = PMISERV_pg_by_id(proxy->pgid);
pg_scratch = (struct HYD_pmcd_pmi_pg_scratch *) pg->pg_scratch;

/* find the number of keyvals */
keyval_count = 0;
for (run = pg_scratch->kvs->key_pair; run; run = run->next)
keyval_count++;

keyval_count -= pg_scratch->keyval_dist_count;

if (keyval_count) {
struct PMIU_cmd pmi;
PMIU_msg_set_query(&pmi, PMIU_WIRE_V1, PMIU_CMD_KVSCACHE, false /* not static */);
arg_count = 1;
for (run = pg_scratch->kvs->key_pair, j = 0; run; run = run->next, j++) {
if (j < pg_scratch->keyval_dist_count)
continue;

PMIU_cmd_add_str(&pmi, run->key, run->val);

arg_count++;
if (arg_count >= MAX_PMI_ARGS) {
pg_scratch->keyval_dist_count += (arg_count - 1);
for (int i = 0; i < pg->proxy_count; i++) {
status = HYD_pmiserv_pmi_reply(&pg->proxy_list[i], process_fd, &pmi);
HYDU_ERR_POP(status, "error writing PMI line\n");
}
struct PMIU_cmd pmi;
int arg_count = 0, num_total = 0;

const char *key, *val;
HYD_kvs_iter_begin(pg_scratch->kvs, true);
while (true) {
bool has_next = HYD_kvs_iter_next(pg_scratch->kvs, &key, &val);
if (has_next) {
if (arg_count == 0) {
PMIU_msg_set_query(&pmi, PMIU_WIRE_V1, PMIU_CMD_KVSCACHE, false /* not static */);
arg_count = 1;
}
PMIU_cmd_add_str(&pmi, key, val);
arg_count++;
num_total++;
}

if (arg_count > 1) {
pg_scratch->keyval_dist_count += (arg_count - 1);
if (arg_count >= MAX_PMI_ARGS || (!has_next && arg_count > 1)) {
for (int i = 0; i < pg->proxy_count; i++) {
status = HYD_pmiserv_pmi_reply(&pg->proxy_list[i], process_fd, &pmi);
HYDU_ERR_POP(status, "error writing PMI line\n");
}
arg_count = 0;
}

if (!has_next) {
break;
}
}
HYD_kvs_iter_end(pg_scratch->kvs);
if (num_total > 0) {
PMIU_cmd_free_buf(&pmi);
}

Expand Down
2 changes: 1 addition & 1 deletion src/pm/hydra/mpiexec/pmiserv_pmi.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct HYD_pmcd_pmi_pg_scratch {
int dead_process_count;

char kvsname[PMI_MAXKVSLEN];
struct HYD_pmcd_pmi_kvs *kvs;
struct HYD_kvs *kvs;
int keyval_dist_count; /* Number of keyvals distributed */
};

Expand Down
6 changes: 2 additions & 4 deletions src/pm/hydra/mpiexec/pmiserv_spawn.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ static HYD_status allocate_spawn_pg(int spawner_pgid);
static HYD_status fill_exec_params(struct HYD_exec *exec, const char *execname, int nprocs,
int argcnt, const char **argv,
int infonum, struct PMIU_token *infos);
static HYD_status fill_preput_kvs(struct HYD_pmcd_pmi_kvs *kvs,
int preput_num, struct PMIU_token *infos);
static HYD_status fill_preput_kvs(struct HYD_kvs *kvs, int preput_num, struct PMIU_token *infos);
static HYD_status do_spawn(void);

static char *get_exec_path(const char *execname, const char *path);
Expand Down Expand Up @@ -205,8 +204,7 @@ static HYD_status fill_exec_params(struct HYD_exec *exec, const char *execname,
goto fn_exit;
}

static HYD_status fill_preput_kvs(struct HYD_pmcd_pmi_kvs *kvs,
int preput_num, struct PMIU_token *infos)
static HYD_status fill_preput_kvs(struct HYD_kvs *kvs, int preput_num, struct PMIU_token *infos)
{
HYD_status status = HYD_SUCCESS;

Expand Down
2 changes: 1 addition & 1 deletion src/pm/hydra/proxy/pmip.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ struct pmip_pg {
struct HYD_exec *exec_list;

/* This is for PMI-2 info-putnodeattr. Should it be per-node or per pg? */
struct HYD_pmcd_pmi_kvs *kvs;
struct HYD_kvs *kvs;

/* PMI-1 caches server kvs locally */
struct cache_put_elem cache_put;
Expand Down
21 changes: 7 additions & 14 deletions src/pm/hydra/proxy/pmip_pmi.c
Original file line number Diff line number Diff line change
Expand Up @@ -513,15 +513,17 @@ HYD_status fn_keyval_cache(struct pmip_pg *pg, struct PMIU_cmd *pmi)
int i;
for (i = 0; i < pg->num_elems; i++) {
struct cache_elem *elem = pg->cache_get + i;
HASH_ADD_STR(pg->hash_get, key, elem, MPL_MEM_PM);
struct cache_elem *replaced;
HASH_REPLACE_STR(pg->hash_get, key, elem, replaced, MPL_MEM_PM);
}
for (; i < pg->num_elems + num_tokens; i++) {
struct cache_elem *elem = pg->cache_get + i;
elem->key = MPL_strdup(tokens[i - pg->num_elems].key);
HYDU_ERR_CHKANDJUMP(status, NULL == elem->key, HYD_INTERNAL_ERROR, "%s", "");
elem->val = MPL_strdup(tokens[i - pg->num_elems].val);
HYDU_ERR_CHKANDJUMP(status, NULL == elem->val, HYD_INTERNAL_ERROR, "%s", "");
HASH_ADD_STR(pg->hash_get, key, elem, MPL_MEM_PM);
struct cache_elem *replaced;
HASH_REPLACE_STR(pg->hash_get, key, elem, replaced, MPL_MEM_PM);
}
pg->num_elems += num_tokens;

Expand Down Expand Up @@ -672,17 +674,9 @@ HYD_status fn_info_getnodeattr(struct pmip_downstream *p, struct PMIU_cmd *pmi)
/* if a predefined value is not found, we let the code fall back
* to regular search and return an error to the client */

const char *val;
int found;
found = 0;

/* FIXME: wrap it in e.g. HYD_pmcd_pmi_find_kvs */
struct HYD_pmcd_pmi_kvs_pair *run;
for (run = PMIP_pg_from_downstream(p)->kvs->key_pair; run; run = run->next) {
if (!strcmp(run->key, key)) {
found = 1;
break;
}
}
HYD_kvs_find(PMIP_pg_from_downstream(p)->kvs, key, &val, &found);

if (!found && wait) {
status = HYD_pmcd_pmi_v2_queue_req(p, pmi, key);
Expand All @@ -692,8 +686,7 @@ HYD_status fn_info_getnodeattr(struct pmip_downstream *p, struct PMIU_cmd *pmi)

struct PMIU_cmd pmi_response;
if (found) {
pmi_errno =
PMIU_msg_set_response_getnodeattr(pmi, &pmi_response, is_static, run->val, true);
pmi_errno = PMIU_msg_set_response_getnodeattr(pmi, &pmi_response, is_static, val, true);
} else {
pmi_errno = PMIU_msg_set_response_fail(pmi, &pmi_response, is_static, 1, "not_found");
}
Expand Down
Loading