Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ch4/ofi: refactor gpu pipeline #6891

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/include/mpiimpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ typedef struct MPIR_Stream MPIR_Stream;
/******************* PART 3: DEVICE INDEPENDENT HEADERS **********************/
/*****************************************************************************/

#include "mpir_misc.h"
#include "mpir_dbg.h"
#include "mpir_objects.h"
#include "mpir_strerror.h"
Expand All @@ -166,6 +165,7 @@ typedef struct MPIR_Stream MPIR_Stream;
#include "mpir_mem.h"
#include "mpir_info.h"
#include "mpir_errcodes.h"
#include "mpir_misc.h"
#include "mpir_errhandler.h"
#include "mpir_attr_generic.h"
#include "mpir_contextid.h"
Expand Down
27 changes: 22 additions & 5 deletions src/include/mpir_misc.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ extern MPL_initlock_t MPIR_init_lock;

#include "typerep_pre.h" /* needed for MPIR_Typerep_req */

/* FIXME: bad names. Not gpu-specific, confusing with MPIR_Request.
* It's a general async handle.
*/
typedef enum {
MPIR_NULL_REQUEST = 0,
MPIR_TYPEREP_REQUEST,
Expand All @@ -64,7 +61,27 @@ typedef struct {
MPL_gpu_request gpu_req;
} u;
MPIR_request_type_t type;
} MPIR_gpu_req;
} MPIR_async_req;

MPL_STATIC_INLINE_PREFIX void MPIR_async_test(MPIR_async_req * areq, int *is_done)
{
int err;
switch (areq->type) {
case MPIR_NULL_REQUEST:
/* a dummy, immediately complete */
*is_done = 1;
break;
case MPIR_TYPEREP_REQUEST:
MPIR_Typerep_test(areq->u.y_req, is_done);
break;
case MPIR_GPU_REQUEST:
err = MPL_gpu_test(&areq->u.gpu_req, is_done);
MPIR_Assertp(err == MPL_SUCCESS);
break;
default:
MPIR_Assert(0);
}
}

int MPIR_Localcopy(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype sendtype,
void *recvbuf, MPI_Aint recvcount, MPI_Datatype recvtype);
Expand All @@ -82,7 +99,7 @@ int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se
MPI_Aint sendoffset, MPL_pointer_attr_t * sendattr, void *recvbuf,
MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset,
MPL_pointer_attr_t * recvattr, MPL_gpu_copy_direction_t dir,
MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * req);
MPL_gpu_engine_type_t enginetype, bool commit, MPIR_async_req * req);

/* Contiguous datatype calculates buffer address with `(char *) buf + dt_true_lb`.
* However, dt_true_lb is treated as ptrdiff_t (signed), and when buf is MPI_BOTTOM
Expand Down
2 changes: 0 additions & 2 deletions src/include/mpir_typerep.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ int MPIR_Typerep_ipack(const void *inbuf, MPI_Aint incount, MPI_Datatype datatyp
int MPIR_Typerep_iunpack(const void *inbuf, MPI_Aint insize, void *outbuf, MPI_Aint outcount,
MPI_Datatype datatype, MPI_Aint outoffset, MPI_Aint * actual_unpack_bytes,
MPIR_Typerep_req * typerep_req, uint32_t flags);
int MPIR_Typerep_wait(MPIR_Typerep_req typerep_req);
int MPIR_Typerep_test(MPIR_Typerep_req typerep_req, int *completed);

int MPIR_Typerep_size_external32(MPI_Datatype type);
int MPIR_Typerep_pack_external(const void *inbuf, MPI_Aint incount, MPI_Datatype datatype,
Expand Down
3 changes: 3 additions & 0 deletions src/mpi/datatype/typerep/src/typerep_pre.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ typedef struct {
#define MPIR_TYPEREP_HANDLE_NULL NULL
#endif

int MPIR_Typerep_wait(MPIR_Typerep_req typerep_req);
int MPIR_Typerep_test(MPIR_Typerep_req typerep_req, int *completed);

#endif /* TYPEREP_PRE_H_INCLUDED */
31 changes: 16 additions & 15 deletions src/mpi/misc/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp
MPI_Aint sendoffset, MPL_pointer_attr_t * send_attr, void *recvbuf,
MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset,
MPL_pointer_attr_t * recv_attr, MPL_gpu_copy_direction_t dir,
MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * gpu_req)
MPL_gpu_engine_type_t enginetype, bool commit,
MPIR_async_req * async_req)
{
int mpi_errno = MPI_SUCCESS;
int mpl_errno = MPL_SUCCESS;
Expand All @@ -200,8 +201,8 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp

MPIR_FUNC_ENTER;

if (gpu_req)
gpu_req->type = MPIR_NULL_REQUEST;
if (async_req)
async_req->type = MPIR_NULL_REQUEST;

MPIR_Datatype_get_size_macro(sendtype, sendsize);
MPIR_Datatype_get_size_macro(recvtype, recvsize);
Expand Down Expand Up @@ -260,7 +261,7 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp
MPIR_ERR_CHKANDJUMP(dev_id == -1, mpi_errno, MPI_ERR_OTHER,
"**mpl_gpu_get_dev_id_from_attr");

if (gpu_req == NULL) {
if (async_req == NULL) {
MPL_gpu_request req;
mpl_errno =
MPL_gpu_imemcpy((char *) MPIR_get_contig_ptr(recvbuf, recvtype_true_lb) +
Expand All @@ -281,8 +282,8 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp
recvoffset, (char *) MPIR_get_contig_ptr(sendbuf,
sendtype_true_lb) +
sendoffset, copy_sz, dev_id, dir, enginetype,
&gpu_req->u.gpu_req, commit);
gpu_req->type = MPIR_GPU_REQUEST;
&async_req->u.gpu_req, commit);
async_req->type = MPIR_GPU_REQUEST;
}
}
#else
Expand All @@ -300,15 +301,15 @@ static int do_localcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatyp
fn_fail:
goto fn_exit;
fn_fallback:
if (gpu_req) {
if (async_req) {
mpi_errno =
do_localcopy(sendbuf, sendcount, sendtype, sendoffset, recvbuf, recvcount, recvtype,
recvoffset, LOCALCOPY_NONBLOCKING, &gpu_req->u.y_req);
recvoffset, LOCALCOPY_NONBLOCKING, &async_req->u.y_req);
MPIR_ERR_CHECK(mpi_errno);
if (gpu_req->u.y_req.req == MPIR_TYPEREP_REQ_NULL) {
gpu_req->type = MPIR_NULL_REQUEST;
if (async_req->u.y_req.req == MPIR_TYPEREP_REQ_NULL) {
async_req->type = MPIR_NULL_REQUEST;
} else {
gpu_req->type = MPIR_TYPEREP_REQUEST;
async_req->type = MPIR_TYPEREP_REQUEST;
}
} else {
mpi_errno =
Expand Down Expand Up @@ -414,7 +415,7 @@ int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se
MPI_Aint sendoffset, MPL_pointer_attr_t * sendattr, void *recvbuf,
MPI_Aint recvcount, MPI_Datatype recvtype, MPI_Aint recvoffset,
MPL_pointer_attr_t * recvattr, MPL_gpu_copy_direction_t dir,
MPL_gpu_engine_type_t enginetype, bool commit, MPIR_gpu_req * req)
MPL_gpu_engine_type_t enginetype, bool commit, MPIR_async_req * async_req)
{
int mpi_errno = MPI_SUCCESS;

Expand All @@ -423,14 +424,14 @@ int MPIR_Ilocalcopy_gpu(const void *sendbuf, MPI_Aint sendcount, MPI_Datatype se
#ifdef MPL_HAVE_GPU
mpi_errno =
do_localcopy_gpu(sendbuf, sendcount, sendtype, sendoffset, sendattr, recvbuf, recvcount,
recvtype, recvoffset, recvattr, dir, enginetype, commit, req);
recvtype, recvoffset, recvattr, dir, enginetype, commit, async_req);
MPIR_ERR_CHECK(mpi_errno);
#else
mpi_errno =
do_localcopy(sendbuf, sendcount, sendtype, sendoffset, recvbuf, recvcount, recvtype,
recvoffset, LOCALCOPY_NONBLOCKING, &req->u.y_req);
recvoffset, LOCALCOPY_NONBLOCKING, &async_req->u.y_req);
MPIR_ERR_CHECK(mpi_errno);
req->type = MPIR_TYPEREP_REQUEST;
async_req->type = MPIR_TYPEREP_REQUEST;
#endif

fn_exit:
Expand Down
1 change: 1 addition & 0 deletions src/mpid/ch4/netmod/ofi/Makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mpi_core_sources += src/mpid/ch4/netmod/ofi/func_table.c \
src/mpid/ch4/netmod/ofi/ofi_progress.c \
src/mpid/ch4/netmod/ofi/ofi_am_events.c \
src/mpid/ch4/netmod/ofi/ofi_nic.c \
src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c \
src/mpid/ch4/netmod/ofi/globals.c \
src/mpid/ch4/netmod/ofi/init_provider.c \
src/mpid/ch4/netmod/ofi/init_settings.c \
Expand Down
3 changes: 3 additions & 0 deletions src/mpid/ch4/netmod/ofi/ofi_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ int MPIDI_OFI_mpi_comm_commit_pre_hook(MPIR_Comm * comm)
MPIDI_OFI_COMM(comm).enable_hashing = 0;
MPIDI_OFI_COMM(comm).pref_nic = NULL;

/* Initialize tag for gpu_pipeline chunks; incremented by sender. */
MPIDI_OFI_COMM(comm).pipeline_tag = 0;

if (comm->hints[MPIR_COMM_HINT_ENABLE_MULTI_NIC_STRIPING] == -1) {
comm->hints[MPIR_COMM_HINT_ENABLE_MULTI_NIC_STRIPING] =
MPIR_CVAR_CH4_OFI_ENABLE_MULTI_NIC_STRIPING;
Expand Down
165 changes: 3 additions & 162 deletions src/mpid/ch4/netmod/ofi/ofi_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,165 +80,6 @@ static int peek_empty_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request
return MPI_SUCCESS;
}

/* GPU pipeline events */
static int pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r)
{
int mpi_errno = MPI_SUCCESS;
int c;
MPIDI_OFI_gpu_pipeline_request *req;
MPIR_Request *sreq;
void *wc_buf = NULL;
MPIR_FUNC_ENTER;

req = (MPIDI_OFI_gpu_pipeline_request *) r;
/* get original mpi request */
sreq = req->parent;
wc_buf = req->buf;
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_send_pool, wc_buf);

MPIR_cc_decr(sreq->cc_ptr, &c);
if (c == 0) {
MPIR_Datatype_release_if_not_builtin(MPIDI_OFI_REQUEST(sreq, datatype));
MPIR_Request_free(sreq);
}
MPL_free(r);

MPIR_FUNC_EXIT;
return mpi_errno;
}

static int pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Request * r, int event_id)
{
int mpi_errno = MPI_SUCCESS;
int vci_local, i;
MPIDI_OFI_gpu_pipeline_request *req;
MPIR_Request *rreq;
void *wc_buf = NULL;
int in_use MPL_UNUSED;
MPIDI_OFI_gpu_task_t *task = NULL;
int engine_type = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_H2D_ENGINE_TYPE;

MPIR_FUNC_ENTER;

req = (MPIDI_OFI_gpu_pipeline_request *) r;
rreq = req->parent;
wc_buf = req->buf;
MPL_free(r);

void *recv_buf = MPIDI_OFI_REQUEST(rreq, noncontig.pack.buf);
size_t recv_count = MPIDI_OFI_REQUEST(rreq, noncontig.pack.count);
MPI_Datatype datatype = MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype);

fi_addr_t remote_addr = MPIDI_OFI_REQUEST(rreq, pipeline_info.remote_addr);
vci_local = MPIDI_OFI_REQUEST(rreq, pipeline_info.vci_local);

if (event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) {
rreq->status.MPI_SOURCE = MPIDI_OFI_cqe_get_source(wc, true);
rreq->status.MPI_ERROR = MPIDI_OFI_idata_get_error_bits(wc->data);
rreq->status.MPI_TAG = MPIDI_OFI_init_get_tag(wc->tag);

if (unlikely(MPIDI_OFI_is_tag_sync(wc->tag))) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.is_sync) = true;
}

uint32_t packed = MPIDI_OFI_idata_get_gpu_packed_bit(wc->data);
uint32_t n_chunks = MPIDI_OFI_idata_get_gpuchunk_bits(wc->data);
if (likely(packed == 0)) {
if (wc->len > 0) {
MPIR_Assert(n_chunks == 0);
/* First chunk arrives. */
MPI_Aint actual_unpack_bytes;
MPIR_gpu_req yreq;
mpi_errno =
MPIR_Ilocalcopy_gpu(wc_buf, wc->len, MPI_BYTE, 0, NULL, recv_buf, recv_count,
datatype, 0, NULL, MPL_GPU_COPY_H2D, engine_type, 1, &yreq);
MPIR_ERR_CHECK(mpi_errno);
actual_unpack_bytes = wc->len;
task =
MPIDI_OFI_create_gpu_task(MPIDI_OFI_PIPELINE_RECV, wc_buf,
actual_unpack_bytes, rreq, yreq);
DL_APPEND(MPIDI_OFI_global.gpu_recv_task_queue[vci_local], task);
MPIDI_OFI_REQUEST(rreq, pipeline_info.offset) += (size_t) actual_unpack_bytes;
} else {
/* free this chunk */
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, wc_buf);
MPIR_Assert(n_chunks > 0);
/* Post recv for remaining chunks. */
MPIR_cc_dec(rreq->cc_ptr);
for (i = 0; i < n_chunks; i++) {
int c;
MPIR_cc_incr(rreq->cc_ptr, &c);

size_t chunk_sz = MPIR_CVAR_CH4_OFI_GPU_PIPELINE_BUFFER_SZ;

char *host_buf = NULL;
MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool,
(void **) &host_buf);

MPIDI_OFI_REQUEST(rreq, event_id) = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE;

MPIDI_OFI_gpu_pipeline_request *chunk_req = NULL;
chunk_req = (MPIDI_OFI_gpu_pipeline_request *)
MPL_malloc(sizeof(MPIDI_OFI_gpu_pipeline_request), MPL_MEM_BUFFER);
if (chunk_req == NULL) {
mpi_errno = MPIR_ERR_OTHER;
goto fn_fail;
}
chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE;
chunk_req->parent = rreq;
chunk_req->buf = host_buf;
int ret = 0;
if (!MPIDI_OFI_global.gpu_recv_queue && host_buf) {
ret = fi_trecv
(MPIDI_OFI_global.ctx
[MPIDI_OFI_REQUEST(rreq, pipeline_info.ctx_idx)].rx,
host_buf, chunk_sz, NULL, remote_addr,
MPIDI_OFI_REQUEST(rreq,
pipeline_info.match_bits) |
MPIDI_OFI_GPU_PIPELINE_SEND, MPIDI_OFI_REQUEST(rreq,
pipeline_info.
mask_bits),
(void *) &chunk_req->context);
}
if (MPIDI_OFI_global.gpu_recv_queue || !host_buf || ret != 0) {
MPIDI_OFI_gpu_pending_recv_t *recv_task =
MPIDI_OFI_create_recv_task(chunk_req, i, n_chunks);
DL_APPEND(MPIDI_OFI_global.gpu_recv_queue, recv_task);
}
}
}
} else {
MPIR_ERR_CHKANDJUMP(true, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed");
}
} else {
if (likely(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE)) {
/* FIXME: current design unpacks all bytes from host buffer, overflow check is missing. */
MPI_Aint actual_unpack_bytes;
MPIR_gpu_req yreq;
mpi_errno =
MPIR_Ilocalcopy_gpu(wc_buf, (MPI_Aint) wc->len, MPI_BYTE, 0, NULL,
(char *) recv_buf, (MPI_Aint) recv_count, datatype,
MPIDI_OFI_REQUEST(rreq, pipeline_info.offset), NULL,
MPL_GPU_COPY_H2D, engine_type, 1, &yreq);
MPIR_ERR_CHECK(mpi_errno);
actual_unpack_bytes = wc->len;
MPIDI_OFI_REQUEST(rreq, pipeline_info.offset) += (size_t) actual_unpack_bytes;
task =
MPIDI_OFI_create_gpu_task(MPIDI_OFI_PIPELINE_RECV, wc_buf, actual_unpack_bytes,
rreq, yreq);
DL_APPEND(MPIDI_OFI_global.gpu_recv_task_queue[vci_local], task);
} else {
MPIR_ERR_CHKANDJUMP(true, mpi_errno, MPI_ERR_OTHER, "**gpu_pipeline_packed");
}
}
fn_exit:
MPIR_FUNC_EXIT;
return mpi_errno;
fn_fail:
rreq->status.MPI_ERROR = mpi_errno;
goto fn_exit;
}

static int send_huge_event(int vci, struct fi_cq_tagged_entry *wc, MPIR_Request * sreq)
{
int mpi_errno = MPI_SUCCESS;
Expand Down Expand Up @@ -630,13 +471,13 @@ int MPIDI_OFI_dispatch_function(int vci, struct fi_cq_tagged_entry *wc, MPIR_Req
mpi_errno = am_read_event(vci, wc, req);
goto fn_exit;
} else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_SEND_GPU_PIPELINE) {
mpi_errno = pipeline_send_event(wc, req);
mpi_errno = MPIDI_OFI_gpu_pipeline_send_event(wc, req);
goto fn_exit;
} else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT) {
mpi_errno = pipeline_recv_event(wc, req, MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT);
mpi_errno = MPIDI_OFI_gpu_pipeline_recv_event(wc, req);
goto fn_exit;
} else if (MPIDI_OFI_REQUEST(req, event_id) == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE) {
mpi_errno = pipeline_recv_event(wc, req, MPIDI_OFI_EVENT_RECV_GPU_PIPELINE);
mpi_errno = MPIDI_OFI_gpu_pipeline_recv_event(wc, req);
goto fn_exit;
} else if (unlikely(1)) {
switch (MPIDI_OFI_REQUEST(req, event_id)) {
Expand Down
Loading