Skip to content

Commit

Permalink
ch4/ofi: refactor gpu pipeline recv_alloc
Browse files Browse the repository at this point in the history
Separate the recv tasks between the initial header and chunks since the
paths clearly separates them.

Use a single async item for all chunk recvs rather than unnecessarily
enqueuing individual chunks since we can track the chunks in the state.
  • Loading branch information
hzhou committed Feb 29, 2024
1 parent fe4d984 commit 14c5ae9
Showing 1 changed file with 56 additions and 27 deletions.
83 changes: 56 additions & 27 deletions src/mpid/ch4/netmod/ofi/ofi_gpu_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ struct pipeline_header {

static void spawn_send_copy(MPIR_Async_thing * thing, MPIR_Request * sreq, MPIR_async_req * areq,
const void *buf, MPI_Aint chunk_sz);
static int start_recv_chunk(MPIR_Request * rreq, int idx, int n_chunks);
static int start_recv_chunk(MPIR_Request * rreq, int n_chunks);
static int start_recv_copy(MPIR_Request * rreq, void *buf, MPI_Aint chunk_sz,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype);

Expand Down Expand Up @@ -245,14 +245,14 @@ int MPIDI_OFI_gpu_pipeline_send_event(struct fi_cq_tagged_entry *wc, MPIR_Reques

/* ------------------------------------
* recv_alloc: allocate recv chunk buffer and post fi_trecv
* There are actually two async things: issuing the initial recv and
* issuing recvs for the rest of the chunks.
*/
struct recv_alloc {
MPIR_Request *rreq;
struct chunk_req *chunk_req;
int n_chunks;
};

static int recv_alloc_poll(MPIR_Async_thing * thing);
/* the state for recv_init_alloc is just the rreq */

static bool issue_recv_alloc(MPIR_Request * rreq, bool is_init);
static int recv_init_alloc_poll(MPIR_Async_thing * thing);

int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq,
void *recv_buf, MPI_Aint count, MPI_Datatype datatype,
Expand All @@ -278,45 +278,77 @@ int MPIDI_OFI_gpu_pipeline_recv(MPIR_Request * rreq,
MPIDI_OFI_REQUEST(rreq, noncontig.pack.count) = count;
MPIDI_OFI_REQUEST(rreq, noncontig.pack.datatype) = datatype;

struct recv_alloc *p;
p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER);
MPIR_Assert(p);
mpi_errno = MPIR_Async_things_add(recv_init_alloc_poll, rreq);

p->rreq = rreq;
p->n_chunks = -1; /* it's MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT */
return mpi_errno;
}

mpi_errno = MPIR_Async_things_add(recv_alloc_poll, p);
static int recv_init_alloc_poll(MPIR_Async_thing * thing)
{
MPIR_Request *rreq = MPIR_Async_thing_get_state(thing);

return mpi_errno;
int ret = issue_recv_alloc(rreq, true /* is_init */);
if (ret) {
return MPIR_ASYNC_THING_DONE;
}

return MPIR_ASYNC_THING_NOPROGRESS;
}

/* ---- */
struct recv_chunk_alloc {
MPIR_Request *rreq;
int n_chunks;
int issued_chunks;
};

static int recv_chunk_alloc_poll(MPIR_Async_thing * thing);

/* this is called from recv_event */
static int start_recv_chunk(MPIR_Request * rreq, int idx, int n_chunks)
static int start_recv_chunk(MPIR_Request * rreq, int n_chunks)
{
int mpi_errno = MPI_SUCCESS;

struct recv_alloc *p;
struct recv_chunk_alloc *p;
p = MPL_malloc(sizeof(*p), MPL_MEM_OTHER);
MPIR_Assert(p);

p->rreq = rreq;
p->n_chunks = n_chunks;
p->issued_chunks = 0;

mpi_errno = MPIR_Async_things_add(recv_alloc_poll, p);
mpi_errno = MPIR_Async_things_add(recv_chunk_alloc_poll, p);

return mpi_errno;
}

static int recv_alloc_poll(MPIR_Async_thing * thing)
static int recv_chunk_alloc_poll(MPIR_Async_thing * thing)
{
struct recv_alloc *p = MPIR_Async_thing_get_state(thing);
struct recv_chunk_alloc *p = MPIR_Async_thing_get_state(thing);
MPIR_Request *rreq = p->rreq;

/* arbitrary threshold */
if (MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) > 1) {
return MPIR_ASYNC_THING_NOPROGRESS;
}

bool ret = issue_recv_alloc(rreq, false /* is_init */);
if (ret) {
p->issued_chunks++;
if (p->issued_chunks == p->n_chunks) {
MPL_free(p);
return MPIR_ASYNC_THING_DONE;
} else {
return MPIR_ASYNC_THING_UPDATED;
}
}

return MPIR_ASYNC_THING_NOPROGRESS;
}

/* ---- */
static bool issue_recv_alloc(MPIR_Request * rreq, bool is_init)
{
void *host_buf;
MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, &host_buf);
if (!host_buf) {
Expand All @@ -336,7 +368,7 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
chunk_req->buf = host_buf;

uint64_t match_bits;
if (p->n_chunks == -1) {
if (is_init) {
match_bits = MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.match_bits);
chunk_req->event_id = MPIDI_OFI_EVENT_RECV_GPU_PIPELINE_INIT;
} else {
Expand All @@ -351,17 +383,16 @@ static int recv_alloc_poll(MPIR_Async_thing * thing)
MPID_THREAD_CS_EXIT(VCI, MPIDI_VCI(vci).lock);
if (ret == 0) {
MPIDI_OFI_REQUEST(rreq, pipeline_info.recv.num_inrecv) += 1;
MPL_free(p);
/* chunk_req and host_buf will be freed in recv_events */
return MPIR_ASYNC_THING_DONE;
return true;
}
if (ret != -FI_EAGAIN && ret != -FI_ENOMEM) {
/* unexpected error */
MPIR_Assert(0);
}
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf);
MPL_free(chunk_req);
return MPIR_ASYNC_THING_NOPROGRESS;
return false;
};

/* ------------------------------------
Expand Down Expand Up @@ -405,10 +436,8 @@ int MPIDI_OFI_gpu_pipeline_recv_event(struct fi_cq_tagged_entry *wc, MPIR_Reques
/* There is no data in the init chunk, free the buffer */
MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.gpu_pipeline_recv_pool, host_buf);
/* Post recv for the remaining chunks. */
for (int i = 0; i < p_hdr->n_chunks; i++) {
mpi_errno = start_recv_chunk(rreq, i, p_hdr->n_chunks);
MPIR_ERR_CHECK(mpi_errno);
}
mpi_errno = start_recv_chunk(rreq, p_hdr->n_chunks);
MPIR_ERR_CHECK(mpi_errno);
}
} else {
MPIR_Assert(event_id == MPIDI_OFI_EVENT_RECV_GPU_PIPELINE);
Expand Down

0 comments on commit 14c5ae9

Please sign in to comment.