Skip to content

Commit

Permalink
sessions:deworldify behavior of pmix pset lookup
Browse files Browse the repository at this point in the history
It turns out that the existing ompi_instance_group_pmix_pset
implementation assumes an MPI_COMM_WORLD type of model.

This prevents the ability to use more dynamically generated process
sets, possibly using an external agent.

Switch to using the pmix pset membership query to find new pset
membership.

Related to #10862
Related to openpmix/prrte#1906

prrte changes in above referenced PR are necessary for creating
groups/communicators from psets defined by --pset option on the
mpirun command line.

Signed-off-by: Howard Pritchard <[email protected]>
  • Loading branch information
hppritcha committed Jan 11, 2024
1 parent 5fa32f7 commit 541a17b
Showing 1 changed file with 67 additions and 54 deletions.
121 changes: 67 additions & 54 deletions ompi/instance/instance.c
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,8 @@ int ompi_instance_get_num_psets (ompi_instance_t *instance, int *npset_names)

int ompi_instance_get_nth_pset (ompi_instance_t *instance, int n, int *len, char *pset_name)
{
if (NULL == ompi_mpi_instance_pmix_psets && n >= ompi_instance_builtin_count) {
if (NULL == ompi_mpi_instance_pmix_psets ||
(size_t) n >= (ompi_instance_builtin_count + ompi_mpi_instance_num_pmix_psets)) {
ompi_instance_refresh_pmix_psets (PMIX_QUERY_PSET_NAMES);
}

Expand Down Expand Up @@ -1229,71 +1230,83 @@ static int ompi_instance_group_self (ompi_instance_t *instance, ompi_group_t **g

static int ompi_instance_group_pmix_pset (ompi_instance_t *instance, const char *pset_name, ompi_group_t **group_out)
{
int ret = OMPI_SUCCESS;
size_t i,n;
bool isnew, try_again = false, refresh = true;
pmix_status_t rc;
pmix_proc_t p;
ompi_group_t *group;
pmix_value_t *pval = NULL;
char *stmp = NULL;
size_t size = 0;

/* make the group large enough to hold world */
group = ompi_group_allocate (NULL, ompi_process_info.num_procs);
if (OPAL_UNLIKELY(NULL == group)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
ompi_group_t *group = NULL;
pmix_query_t query;
pmix_info_t *info = NULL;
size_t ninfo;
opal_process_name_t pname;

PMIX_QUERY_CONSTRUCT(&query);
PMIX_ARGV_APPEND(rc, query.keys, PMIX_QUERY_PSET_MEMBERSHIP);
PMIX_INFO_CREATE(query.qualifiers, 1);
query.nqual = 1;
PMIX_INFO_LOAD(&query.qualifiers[0], PMIX_PSET_NAME, pset_name, PMIX_STRING);

for (size_t i = 0 ; i < ompi_process_info.num_procs ; ++i) {
opal_process_name_t name = {.vpid = i, .jobid = OMPI_PROC_MY_NAME->jobid};
/*
* First try finding in the local PMIx cache, if not found, try a refresh
*/
fn_try_again:
rc = PMIx_Query_info(&query, 1, &info, &ninfo);
if (PMIX_SUCCESS != (rc = PMIx_Query_info(&query, 1, &info, &ninfo)) || 0 == ninfo) {
if ((PMIX_ERR_NOT_FOUND == rc) && (false == try_again)) {
try_again = true;
PMIX_QUERY_DESTRUCT(&query);
PMIX_QUERY_CONSTRUCT(&query);
PMIX_ARGV_APPEND(rc, query.keys, PMIX_QUERY_PSET_MEMBERSHIP);
PMIX_INFO_CREATE(query.qualifiers, 2);
PMIX_INFO_LOAD(&query.qualifiers[0], PMIX_PSET_NAME, pset_name, PMIX_STRING);
PMIX_INFO_LOAD(&query.qualifiers[1], PMIX_QUERY_REFRESH_CACHE, &refresh, PMIX_BOOL);
goto fn_try_again;
}
ret = opal_pmix_convert_status(rc);
ompi_instance_print_error ("PMIx_Query_info() failed", ret);
goto fn_w_query;
}

OPAL_PMIX_CONVERT_NAME(&p, &name);
rc = PMIx_Get(&p, PMIX_PSET_NAME, NULL, 0, &pval);
if (OPAL_UNLIKELY(PMIX_SUCCESS != rc)) {
OBJ_RELEASE(group);
return opal_pmix_convert_status(rc);
}
for(n = 0; n < ninfo; n++){
if(0 == strcmp(info[n].key, PMIX_QUERY_PSET_MEMBERSHIP)){

pmix_data_array_t *data_array = info[n].value.data.darray;
pmix_proc_t *members_array = (pmix_proc_t*) data_array->array;

PMIX_VALUE_UNLOAD(rc,
pval,
(void **)&stmp,
&size);
if (0 != strcmp (pset_name, stmp)) {
PMIX_VALUE_RELEASE(pval);
free(stmp);
continue;
}
PMIX_VALUE_RELEASE(pval);
free(stmp);
group = ompi_group_allocate (NULL, data_array->size);
if (OPAL_UNLIKELY(NULL == group)) {
ret = OMPI_ERR_OUT_OF_RESOURCE;
goto fn_w_info;
}

/* look for existing ompi_proc_t that matches this name */
group->grp_proc_pointers[size] = (ompi_proc_t *) ompi_proc_lookup (name);
if (NULL == group->grp_proc_pointers[size]) {
/* set sentinel value */
group->grp_proc_pointers[size] = (ompi_proc_t *) ompi_proc_name_to_sentinel (name);
} else {
OBJ_RETAIN (group->grp_proc_pointers[size]);
for(i = 0; i < data_array->size; i++){
OPAL_PMIX_CONVERT_PROCT(ret, &pname, &members_array[i]);
if (OPAL_SUCCESS == rc) {
group->grp_proc_pointers[i] = ompi_proc_find_and_add(&pname,&isnew);
} else {
ompi_instance_print_error ("OPAL_PMIX_CONVERT_PROCT failed %d", ret);
ompi_group_free(&group);
goto fn_w_info;
}
}
break;
}
++size;
}

/* shrink the proc array if needed */
if (size < (size_t) group->grp_proc_count) {
void *tmp = realloc (group->grp_proc_pointers, size * sizeof (group->grp_proc_pointers[0]));
if (OPAL_UNLIKELY(NULL == tmp)) {
OBJ_RELEASE(group);
return OMPI_ERR_OUT_OF_RESOURCE;
}

group->grp_proc_pointers = (ompi_proc_t **) tmp;
group->grp_proc_count = (int) size;
if (NULL != group) {
ompi_set_group_rank (group, ompi_proc_local());
group->grp_instance = instance;
*group_out = group;
} else {
ret = OMPI_ERR_NOT_FOUND;
}

ompi_set_group_rank (group, ompi_proc_local());

group->grp_instance = instance;
fn_w_info:
PMIX_INFO_DESTRUCT(info);
fn_w_query:
PMIX_QUERY_DESTRUCT(&query);

*group_out = group;
return OMPI_SUCCESS;
return ret;
}

static int ompi_instance_get_pmix_pset_size (ompi_instance_t *instance, const char *pset_name, size_t *size_out)
Expand Down

0 comments on commit 541a17b

Please sign in to comment.