Skip to content

Commit

Permalink
osc/rdma: bug fixes
Browse files Browse the repository at this point in the history
This commit fixes the following bugs:

 - Allow a btl to be used for communication if it can communicate with
   all non-self peers and it supports global atomic visibility. In
   this case CPU atomics can be used for self and the btl for any
   other peer.

 - It was possible to get into a state where different threads of an
   MPI process could issue conflicting accumulate operations to a
   remote peer. To eliminate this race we now update the peer flags
   atomically.

 - Queue up and re-issue put operations that failed during a BTL
   callback. This can occur during an accumulate operation. This was
   an unhandled error case.

Signed-off-by: Nathan Hjelm <[email protected]>
  • Loading branch information
hjelmn committed Nov 29, 2017
1 parent 67e26b6 commit 45db363
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 21 deletions.
112 changes: 100 additions & 12 deletions ompi/mca/osc/rdma/osc_rdma_accumulate.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2016 Los Alamos National Security, LLC. All rights
* Copyright (c) 2014-2017 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
Expand All @@ -18,6 +18,90 @@

#include "ompi/mca/osc/base/osc_base_obj_convert.h"

enum ompi_osc_rdma_event_type_t {
OMPI_OSC_RDMA_EVENT_TYPE_PUT,
};

typedef enum ompi_osc_rdma_event_type_t ompi_osc_rdma_event_type_t;

struct ompi_osc_rdma_event_t {
opal_event_t super;
ompi_osc_rdma_module_t *module;
struct mca_btl_base_endpoint_t *endpoint;
void *local_address;
mca_btl_base_registration_handle_t *local_handle;
uint64_t remote_address;
mca_btl_base_registration_handle_t *remote_handle;
uint64_t length;
mca_btl_base_rdma_completion_fn_t cbfunc;
void *cbcontext;
void *cbdata;
};

typedef struct ompi_osc_rdma_event_t ompi_osc_rdma_event_t;

static void *ompi_osc_rdma_event_put (int fd, int flags, void *context)
{
ompi_osc_rdma_event_t *event = (ompi_osc_rdma_event_t *) context;
int ret;

ret = event->module->selected_btl->btl_put (event->module->selected_btl, event->endpoint, event->local_address,
event->remote_address, event->local_handle, event->remote_handle,
event->length, 0, MCA_BTL_NO_ORDER, event->cbfunc, event->cbcontext,
event->cbdata);
if (OPAL_LIKELY(OPAL_SUCCESS == ret)) {
/* done with this event */
opal_event_del (&event->super);
free (event);
} else {
/* re-activate the event */
opal_event_active (&event->super, OPAL_EV_READ, 1);
}

return NULL;
}

static int ompi_osc_rdma_event_queue (ompi_osc_rdma_module_t *module, struct mca_btl_base_endpoint_t *endpoint,
ompi_osc_rdma_event_type_t event_type, void *local_address, mca_btl_base_registration_handle_t *local_handle,
uint64_t remote_address, mca_btl_base_registration_handle_t *remote_handle,
uint64_t length, mca_btl_base_rdma_completion_fn_t cbfunc, void *cbcontext,
void *cbdata)
{
ompi_osc_rdma_event_t *event = malloc (sizeof (*event));
void *(*event_func) (int, int, void *);

if (OPAL_UNLIKELY(NULL == event)) {
return OMPI_ERR_OUT_OF_RESOURCE;
}

event->module = module;
event->endpoint = endpoint;
event->local_address = local_address;
event->local_handle = local_handle;
event->remote_address = remote_address;
event->remote_handle = remote_handle;
event->length = length;
event->cbfunc = cbfunc;
event->cbcontext = cbcontext;
event->cbdata = cbdata;

switch (event_type) {
case OMPI_OSC_RDMA_EVENT_TYPE_PUT:
event_func = ompi_osc_rdma_event_put;
break;
default:
opal_output(0, "osc/rdma: cannot queue unknown event type %d", event_type);
abort ();
}

opal_event_set (opal_sync_event_base, &event->super, -1, OPAL_EV_READ,
event_func, event);
opal_event_active (&event->super, OPAL_EV_READ, 1);

return OMPI_SUCCESS;
}


static int ompi_osc_rdma_gacc_local (const void *source_buffer, int source_count, ompi_datatype_t *source_datatype,
void *result_buffer, int result_count, ompi_datatype_t *result_datatype,
ompi_osc_rdma_peer_t *peer, uint64_t target_address,
Expand Down Expand Up @@ -113,7 +197,7 @@ static void ompi_osc_rdma_acc_put_complete (struct mca_btl_base_module_t *btl, s
}

ompi_osc_rdma_sync_rdma_dec (sync);
peer->flags &= ~OMPI_OSC_RDMA_PEER_ACCUMULATING;
ompi_osc_rdma_peer_clear_flag (peer, OMPI_OSC_RDMA_PEER_ACCUMULATING);
}

/* completion of an accumulate get operation */
Expand Down Expand Up @@ -171,7 +255,12 @@ static void ompi_osc_rdma_acc_get_complete (struct mca_btl_base_module_t *btl, s
(mca_btl_base_registration_handle_t *) request->ctx,
request->len, 0, MCA_BTL_NO_ORDER, ompi_osc_rdma_acc_put_complete,
request, NULL);
/* TODO -- we can do better. probably should queue up the next step and handle it in progress */
if (OPAL_SUCCESS != status) {
status = ompi_osc_rdma_event_queue (module, endpoint, OMPI_OSC_RDMA_EVENT_TYPE_PUT, (void *) source, local_handle,
request->target_address, (mca_btl_base_registration_handle_t *) request->ctx,
request->len, ompi_osc_rdma_acc_put_complete, request, NULL);
}

assert (OPAL_SUCCESS == status);
}

Expand Down Expand Up @@ -203,13 +292,12 @@ static inline int ompi_osc_rdma_gacc_contig (ompi_osc_rdma_sync_t *sync, const v

OPAL_THREAD_LOCK(&module->lock);
/* to ensure order wait until the previous accumulate completes */
while (ompi_osc_rdma_peer_is_accumulating (peer)) {
while (!ompi_osc_rdma_peer_test_set_flag (peer, OMPI_OSC_RDMA_PEER_ACCUMULATING)) {
OPAL_THREAD_UNLOCK(&module->lock);
ompi_osc_rdma_progress (module);
OPAL_THREAD_LOCK(&module->lock);
}

peer->flags |= OMPI_OSC_RDMA_PEER_ACCUMULATING;
OPAL_THREAD_UNLOCK(&module->lock);

if (!ompi_osc_rdma_peer_is_exclusive (peer)) {
Expand Down Expand Up @@ -847,11 +935,12 @@ static void ompi_osc_rdma_cas_get_complete (struct mca_btl_base_module_t *btl, s
ompi_osc_rdma_acc_put_complete, request, NULL);
if (OPAL_UNLIKELY(OPAL_SUCCESS != ret)) {
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_ERROR, "could not start put to complete accumulate operation. opal return code "
"%d", ret);
}
"%d. queuing operation...", ret);

/* TODO -- we can do better. probably should queue up the next step and handle it in progress */
assert (OPAL_SUCCESS == ret);
ret = ompi_osc_rdma_event_queue (module, peer->data_endpoint, OMPI_OSC_RDMA_EVENT_TYPE_PUT, local_address, local_handle,
request->target_address, (mca_btl_base_registration_handle_t *) request->ctx, request->len,
ompi_osc_rdma_acc_put_complete, request, NULL);
}

return;
}
Expand All @@ -868,7 +957,7 @@ static void ompi_osc_rdma_cas_get_complete (struct mca_btl_base_module_t *btl, s
ompi_osc_rdma_request_complete (request, status);

ompi_osc_rdma_sync_rdma_dec (sync);
peer->flags &= ~OMPI_OSC_RDMA_PEER_ACCUMULATING;
ompi_osc_rdma_peer_clear_flag (peer, OMPI_OSC_RDMA_PEER_ACCUMULATING);
}

static inline int cas_rdma (ompi_osc_rdma_sync_t *sync, const void *source_addr, const void *compare_addr, void *result_addr,
Expand All @@ -894,12 +983,11 @@ static inline int cas_rdma (ompi_osc_rdma_sync_t *sync, const void *source_addr,

OPAL_THREAD_LOCK(&module->lock);
/* to ensure order wait until the previous accumulate completes */
while (ompi_osc_rdma_peer_is_accumulating (peer)) {
while (!ompi_osc_rdma_peer_test_set_flag (peer, OMPI_OSC_RDMA_PEER_ACCUMULATING)) {
OPAL_THREAD_UNLOCK(&module->lock);
ompi_osc_rdma_progress (module);
OPAL_THREAD_LOCK(&module->lock);
}
peer->flags |= OMPI_OSC_RDMA_PEER_ACCUMULATING;
OPAL_THREAD_UNLOCK(&module->lock);

offset = target_address & btl_alignment_mask;;
Expand Down
2 changes: 1 addition & 1 deletion ompi/mca/osc/rdma/osc_rdma_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ static int ompi_osc_rdma_master_noncontig (ompi_osc_rdma_sync_t *sync, void *loc

subreq = NULL;

OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "scheduling rdma on non-contiguous datatype(s)");
OSC_RDMA_VERBOSE(MCA_BASE_VERBOSE_TRACE, "scheduling rdma on non-contiguous datatype(s) or large region");

/* prepare convertors for the source and target. these convertors will be used to determine the
* contiguous segments within the source and target. */
Expand Down
9 changes: 8 additions & 1 deletion ompi/mca/osc/rdma/osc_rdma_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -850,11 +850,18 @@ static int ompi_osc_rdma_query_btls (ompi_communicator_t *comm, struct mca_btl_b
}

for (int i = 0 ; i < max_btls ; ++i) {
int btl_count = btl_counts[i];

if (NULL == possible_btls[i]) {
break;
}

if (btl_counts[i] == comm_size && possible_btls[i]->btl_latency < selected_latency) {
if (possible_btls[i]->btl_atomic_flags & MCA_BTL_ATOMIC_SUPPORTS_GLOB) {
/* do not need to use the btl for self communication */
btl_count++;
}

if (btl_count >= comm_size && possible_btls[i]->btl_latency < selected_latency) {
selected_btl = possible_btls[i];
selected_latency = possible_btls[i]->btl_latency;
}
Expand Down
3 changes: 2 additions & 1 deletion ompi/mca/osc/rdma/osc_rdma_peer.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ int ompi_osc_rdma_new_peer (struct ompi_osc_rdma_module_t *module, int peer_id,
*peer_out = NULL;

endpoint = ompi_osc_rdma_peer_btl_endpoint (module, peer_id);
if (OPAL_UNLIKELY(NULL == endpoint)) {
if (OPAL_UNLIKELY(NULL == endpoint && !((module->selected_btl->btl_atomic_flags & MCA_BTL_ATOMIC_SUPPORTS_GLOB) &&
peer_id == ompi_comm_rank (module->comm)))) {
return OMPI_ERR_UNREACH;
}

Expand Down
40 changes: 34 additions & 6 deletions ompi/mca/osc/rdma/osc_rdma_peer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2014-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2014-2017 Los Alamos National Security, LLC. All rights
* reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -40,7 +40,7 @@ struct ompi_osc_rdma_peer_t {
int rank;

/** peer flags */
int flags;
volatile int32_t flags;

/** aggregation support */
ompi_osc_rdma_aggregation_t *aggregate;
Expand Down Expand Up @@ -188,13 +188,41 @@ static inline bool ompi_osc_rdma_peer_is_exclusive (ompi_osc_rdma_peer_t *peer)
}

/**
* @brief check if this process is currently accumulating on a peer
* @brief try to set a flag on a peer object
*
* @param[in] peer peer object to check
* @param[in] peer peer object to modify
* @param[in] flag flag to set
*
* @returns true if the flag was not already set
* @returns flase otherwise
*/
static inline bool ompi_osc_rdma_peer_test_set_flag (ompi_osc_rdma_peer_t *peer, int flag)
{
int32_t flags;

opal_atomic_mb ();

do {
flags = peer->flags;
if (flags & flag) {
return false;
}

} while (!OPAL_THREAD_BOOL_CMPSET_32 (&peer->flags, flags, flags | flag));

return true;
}

/**
* @brief clear a flag from a peer object
*
* @param[in] peer peer object to modify
* @param[in] flag flag to set
*/
static inline bool ompi_osc_rdma_peer_is_accumulating (ompi_osc_rdma_peer_t *peer)
static inline void ompi_osc_rdma_peer_clear_flag (ompi_osc_rdma_peer_t *peer, int flag)
{
return !!(peer->flags & OMPI_OSC_RDMA_PEER_ACCUMULATING);
OPAL_ATOMIC_AND32(&peer->flags, ~flag);
opal_atomic_mb ();
}

/**
Expand Down

0 comments on commit 45db363

Please sign in to comment.