Skip to content

Commit

Permalink
Remove the last involvement of the OOB system from the MPI layer, rem…
Browse files Browse the repository at this point in the history
…ove the no-longer-needed usock/oob component, and have procs no longer open the RML, OOB, ROUTED, and GRPCOMM frameworks as PMIx now provides all required app-mpirun cmds
  • Loading branch information
Ralph Castain committed Sep 15, 2015
1 parent 3b4e982 commit c1bbbb5
Show file tree
Hide file tree
Showing 35 changed files with 326 additions and 4,309 deletions.
18 changes: 0 additions & 18 deletions contrib/platform/intel/bend/linux.conf
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,8 @@
# parameters available and their default values.
#

#default hostfile
#orte_default_hostfile = /home/common/hosts
#ras_slurm_enable_dyn_alloc = 1
#ras_slurm_config_file = /home/common/slurm/conf/slurm.conf

# Basic behavior to smooth startup
mca_base_component_show_load_errors = 1
mpi_param_check = 0
orte_abort_timeout = 10
hwloc_base_mem_bind_failure_action = silent

## Protect the shared file systems

## Add the interface for out-of-band communication
## and set it up
oob_tcp_peer_retries = 120
#oob_tcp_connect_timeout=600

## Define the MPI interconnects
btl = sm,tcp,self

## Setup shared memory
btl_sm_free_list_max = 768
13 changes: 0 additions & 13 deletions contrib/platform/intel/bend/mac-orcm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,6 @@

# Basic behavior to smooth startup
mca_base_component_show_load_errors = 1
mpi_param_check = 0
orte_abort_timeout = 10
hwloc_base_mem_bind_failure_action = silent

## Protect the shared file systems

## Add the interface for out-of-band communication
## and set it up
oob_tcp_peer_retries = 120
#oob_tcp_connect_timeout=600

## Define the MPI interconnects
btl = sm,tcp,self

## Setup shared memory
btl_sm_free_list_max = 768
14 changes: 0 additions & 14 deletions contrib/platform/intel/bend/mac.conf
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,5 @@

# Basic behavior to smooth startup
mca_base_component_show_load_errors = 1
mpi_param_check = 0
orte_abort_timeout = 10
hwloc_base_mem_bind_failure_action = silent

## Protect the shared file systems

## Add the interface for out-of-band communication
## and set it up
oob_tcp_peer_retries = 120
#oob_tcp_connect_timeout=600

## Define the MPI interconnects
btl = sm,tcp,self

## Setup shared memory
btl_sm_free_list_max = 768
161 changes: 73 additions & 88 deletions ompi/communicator/comm_cid.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Copyright (c) 2012-2014 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013-2014 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
Expand All @@ -30,6 +30,7 @@
#include "ompi_config.h"

#include "opal/dss/dss.h"
#include "opal/mca/pmix/pmix.h"

#include "ompi/proc/proc.h"
#include "ompi/communicator/communicator.h"
Expand Down Expand Up @@ -58,47 +59,47 @@ typedef int ompi_comm_cid_allredfct (int *inbuf, int* outbuf,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader,
int send_first );
int send_first, char *tag, int iter );

static int ompi_comm_allreduce_intra (int *inbuf, int* outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_ledaer,
int send_first );
int send_first, char *tag, int iter );

static int ompi_comm_allreduce_inter (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first );
int send_first, char *tag, int iter );

static int ompi_comm_allreduce_intra_bridge(int *inbuf, int* outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first);
int send_first, char *tag, int iter);

static int ompi_comm_allreduce_intra_oob (int *inbuf, int* outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first );
static int ompi_comm_allreduce_intra_pmix (int *inbuf, int* outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first, char *tag, int iter );

static int ompi_comm_allreduce_group (int *inbuf, int* outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *intercomm,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first);
int send_first, char *tag, int iter);

/* non-blocking intracommunicator allreduce */
static int ompi_comm_allreduce_intra_nb (int *inbuf, int *outbuf,
Expand Down Expand Up @@ -158,7 +159,7 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
int response, glresponse=0;
int start;
unsigned int i;

int iter=0;
ompi_comm_cid_allredfct* allredfnct;

/**
Expand All @@ -177,8 +178,8 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
case OMPI_COMM_CID_INTRA_PMIX:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix;
break;
case OMPI_COMM_CID_GROUP:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group;
Expand Down Expand Up @@ -218,7 +219,8 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
}

ret = (allredfnct)(&nextlocal_cid, &nextcid, 1, MPI_MAX, comm, bridgecomm,
local_leader, remote_leader, send_first );
local_leader, remote_leader, send_first, "nextcid", iter );
++iter;
if( OMPI_SUCCESS != ret ) {
opal_pointer_array_set_item(&ompi_mpi_communicators, nextlocal_cid, NULL);
goto release_and_return;
Expand Down Expand Up @@ -251,7 +253,8 @@ int ompi_comm_nextcid ( ompi_communicator_t* newcomm,
}

ret = (allredfnct)(&response, &glresponse, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
local_leader, remote_leader, send_first, "nextcid", iter );
++iter;
if( OMPI_SUCCESS != ret ) {
opal_pointer_array_set_item(&ompi_mpi_communicators, nextcid, NULL);
goto release_and_return;
Expand Down Expand Up @@ -614,8 +617,8 @@ int ompi_comm_activate ( ompi_communicator_t** newcomm,
case OMPI_COMM_CID_INTRA_BRIDGE:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_bridge;
break;
case OMPI_COMM_CID_INTRA_OOB:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_oob;
case OMPI_COMM_CID_INTRA_PMIX:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_intra_pmix;
break;
case OMPI_COMM_CID_GROUP:
allredfnct=(ompi_comm_cid_allredfct*)ompi_comm_allreduce_group;
Expand All @@ -636,7 +639,7 @@ int ompi_comm_activate ( ompi_communicator_t** newcomm,


ret = (allredfnct)(&ok, &gok, 1, MPI_MIN, comm, bridgecomm,
local_leader, remote_leader, send_first );
local_leader, remote_leader, send_first, "activate", 0 );
if( OMPI_SUCCESS != ret ) {
goto bail_on_error;
}
Expand Down Expand Up @@ -870,7 +873,7 @@ static int ompi_comm_allreduce_intra ( int *inbuf, int *outbuf,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first )
int send_first, char *tag, int iter )
{
return comm->c_coll.coll_allreduce ( inbuf, outbuf, count, MPI_INT, op, comm,
comm->c_coll.coll_allreduce_module );
Expand Down Expand Up @@ -899,7 +902,7 @@ static int ompi_comm_allreduce_inter ( int *inbuf, int *outbuf,
ompi_communicator_t *bridgecomm,
void* local_leader,
void* remote_leader,
int send_first )
int send_first, char *tag, int iter )
{
int local_rank, rsize;
int rc;
Expand Down Expand Up @@ -1204,7 +1207,7 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
ompi_communicator_t *comm,
ompi_communicator_t *bcomm,
void* lleader, void* rleader,
int send_first )
int send_first, char *tag, int iter )
{
int *tmpbuf=NULL;
int local_rank;
Expand Down Expand Up @@ -1291,46 +1294,30 @@ static int ompi_comm_allreduce_intra_bridge (int *inbuf, int *outbuf,
return (rc);
}

typedef struct {
opal_buffer_t buf;
bool active;
} comm_cid_return_t;

static void comm_cid_recv(int status,
ompi_process_name_t* peer,
opal_buffer_t* buffer,
ompi_rml_tag_t tag,
void* cbdata)
{
comm_cid_return_t *rcid = (comm_cid_return_t*)cbdata;

opal_dss.copy_payload(&rcid->buf, buffer);
rcid->active = false;
}

/* Arguments not used in this implementation:
* - bridgecomm
*
* lleader is the local rank of root in comm
* rleader is the OOB contact information of the
* root processes in the other world.
* rleader is the port_string
*/
static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader,
int send_first )
static int ompi_comm_allreduce_intra_pmix (int *inbuf, int *outbuf,
int count, struct ompi_op_t *op,
ompi_communicator_t *comm,
ompi_communicator_t *bridgecomm,
void* lleader, void* rleader,
int send_first, char *tag, int iter )
{
int *tmpbuf=NULL;
int rc;
int local_leader, local_rank;
ompi_process_name_t *remote_leader=NULL;
char *port_string;
opal_value_t info;
opal_pmix_pdata_t pdat;
opal_buffer_t sbuf;
int32_t size_count;
comm_cid_return_t rcid;

local_leader = (*((int*)lleader));
remote_leader = (ompi_process_name_t*)rleader;
port_string = (char*)rleader;
size_count = count;

local_rank = ompi_comm_rank ( comm );
Expand All @@ -1348,50 +1335,48 @@ static int ompi_comm_allreduce_intra_oob (int *inbuf, int *outbuf,
}

if (local_rank == local_leader ) {
opal_buffer_t *sbuf;
OBJ_CONSTRUCT(&sbuf, opal_buffer_t);

sbuf = OBJ_NEW(opal_buffer_t);

if (OPAL_SUCCESS != (rc = opal_dss.pack(sbuf, tmpbuf, (int32_t)count, OPAL_INT))) {
if (OPAL_SUCCESS != (rc = opal_dss.pack(&sbuf, tmpbuf, (int32_t)count, OPAL_INT))) {
goto exit;
}
OBJ_CONSTRUCT(&info, opal_value_t);
OBJ_CONSTRUCT(&pdat, opal_pmix_pdata_t);

if ( send_first ) {
if (0 > (rc = ompi_rte_send_buffer_nb(remote_leader, sbuf,
OMPI_RML_TAG_COMM_CID_INTRA,
ompi_rte_send_cbfunc, NULL))) {
goto exit;
}
OBJ_CONSTRUCT(&rcid.buf, opal_buffer_t);
rcid.active = true;
ompi_rte_recv_buffer_nb(remote_leader, OMPI_RML_TAG_COMM_CID_INTRA,
OMPI_RML_NON_PERSISTENT, comm_cid_recv, &rcid);
while (rcid.active) {
opal_progress();
}
}
else {
OBJ_CONSTRUCT(&rcid.buf, opal_buffer_t);
rcid.active = true;
ompi_rte_recv_buffer_nb(remote_leader, OMPI_RML_TAG_COMM_CID_INTRA,
OMPI_RML_NON_PERSISTENT, comm_cid_recv, &rcid);
while (rcid.active) {
opal_progress();
}
if (0 > (rc = ompi_rte_send_buffer_nb(remote_leader, sbuf,
OMPI_RML_TAG_COMM_CID_INTRA,
ompi_rte_send_cbfunc, NULL))) {
goto exit;
}
info.type = OPAL_BYTE_OBJECT;
pdat.value.type = OPAL_BYTE_OBJECT;

opal_dss.unload(&sbuf, (void**)&info.data.bo.bytes, &info.data.bo.size);
OBJ_DESTRUCT(&sbuf);

if (send_first) {
(void)asprintf(&info.key, "%s:%s:send:%d", port_string, tag, iter);
(void)asprintf(&pdat.value.key, "%s:%s:recv:%d", port_string, tag, iter);
} else {
(void)asprintf(&info.key, "%s:%s:recv:%d", port_string, tag, iter);
(void)asprintf(&pdat.value.key, "%s:%s:send:%d", port_string, tag, iter);
}

if (OPAL_SUCCESS != (rc = opal_dss.unpack(&rcid.buf, outbuf, &size_count, OPAL_INT))) {
OPAL_PMIX_EXCHANGE(rc, &info, &pdat, 60);
OBJ_DESTRUCT(&info);

if (OPAL_SUCCESS != rc) {
OBJ_DESTRUCT(&pdat);
goto exit;
}
OBJ_CONSTRUCT(&sbuf, opal_buffer_t);
opal_dss.load(&sbuf, pdat.value.data.bo.bytes, pdat.value.data.bo.size);
pdat.value.data.bo.bytes = NULL;
pdat.value.data.bo.size = 0;
OBJ_DESTRUCT(&pdat);

if (OPAL_SUCCESS != (rc = opal_dss.unpack(&sbuf, outbuf, &size_count, OPAL_INT))) {
OBJ_DESTRUCT(&sbuf);
goto exit;
}
OBJ_DESTRUCT(&rcid.buf);
OBJ_DESTRUCT(&sbuf);
count = (int)size_count;

ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT);
ompi_op_reduce (op, tmpbuf, outbuf, count, MPI_INT);
}

rc = comm->c_coll.coll_bcast (outbuf, count, MPI_INT,
Expand All @@ -1412,7 +1397,7 @@ static int ompi_comm_allreduce_group (int *inbuf, int* outbuf,
ompi_communicator_t *newcomm,
void* local_leader,
void* remote_leader,
int send_first)
int send_first, char *intag, int iter)
{
ompi_group_t *group = newcomm->c_local_group;
int peers_group[3], peers_comm[3];
Expand Down
Loading

0 comments on commit c1bbbb5

Please sign in to comment.