diff --git a/ompi/mca/pml/bfo/pml_bfo_recvreq.c b/ompi/mca/pml/bfo/pml_bfo_recvreq.c index 2cf1534b64d..969420efc0b 100644 --- a/ompi/mca/pml/bfo/pml_bfo_recvreq.c +++ b/ompi/mca/pml/bfo/pml_bfo_recvreq.c @@ -154,6 +154,7 @@ static int mca_pml_bfo_recv_request_cancel(struct ompi_request_t* ompi_request, static void mca_pml_bfo_recv_request_construct(mca_pml_bfo_recv_request_t* request) { request->req_recv.req_base.req_type = MCA_PML_REQUEST_RECV; + request->req_recv.req_base.req_ompi.req_start = mca_pml_bfo_start; request->req_recv.req_base.req_ompi.req_free = mca_pml_bfo_recv_request_free; request->req_recv.req_base.req_ompi.req_cancel = mca_pml_bfo_recv_request_cancel; request->req_rdma_cnt = 0; diff --git a/ompi/mca/pml/bfo/pml_bfo_sendreq.c b/ompi/mca/pml/bfo/pml_bfo_sendreq.c index 815097ef78c..67208a9fe4a 100644 --- a/ompi/mca/pml/bfo/pml_bfo_sendreq.c +++ b/ompi/mca/pml/bfo/pml_bfo_sendreq.c @@ -131,6 +131,7 @@ static int mca_pml_bfo_send_request_cancel(struct ompi_request_t* request, int c static void mca_pml_bfo_send_request_construct(mca_pml_bfo_send_request_t* req) { req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; + req->req_send.req_base.req_ompi.req_start = mca_pml_bfo_start; req->req_send.req_base.req_ompi.req_free = mca_pml_bfo_send_request_free; req->req_send.req_base.req_ompi.req_cancel = mca_pml_bfo_send_request_cancel; req->req_rdma_cnt = 0; diff --git a/ompi/mca/pml/cm/pml_cm_recvreq.c b/ompi/mca/pml/cm/pml_cm_recvreq.c index 707666c6aac..ccece912117 100644 --- a/ompi/mca/pml/cm/pml_cm_recvreq.c +++ b/ompi/mca/pml/cm/pml_cm_recvreq.c @@ -56,6 +56,7 @@ void mca_pml_cm_recv_request_completion(struct mca_mtl_request_t *mtl_request) static void mca_pml_cm_recv_request_construct(mca_pml_cm_thin_recv_request_t* recvreq) { + recvreq->req_base.req_ompi.req_start = mca_pml_cm_start; recvreq->req_base.req_ompi.req_free = mca_pml_cm_recv_request_free; recvreq->req_base.req_ompi.req_cancel = mca_pml_cm_cancel; OBJ_CONSTRUCT( &(recvreq->req_base.req_convertor), opal_convertor_t ); diff --git a/ompi/mca/pml/cm/pml_cm_sendreq.c b/ompi/mca/pml/cm/pml_cm_sendreq.c index 8d0f3bad90f..6d156286f45 100644 --- a/ompi/mca/pml/cm/pml_cm_sendreq.c +++ b/ompi/mca/pml/cm/pml_cm_sendreq.c @@ -63,6 +63,7 @@ mca_pml_cm_send_request_completion(struct mca_mtl_request_t *mtl_request) static void mca_pml_cm_send_request_construct(mca_pml_cm_hvy_send_request_t* sendreq) { /* no need to reinit for every send -- never changes */ + sendreq->req_send.req_base.req_ompi.req_start = mca_pml_cm_start; sendreq->req_send.req_base.req_ompi.req_free = mca_pml_cm_send_request_free; sendreq->req_send.req_base.req_ompi.req_cancel = mca_pml_cm_cancel; } diff --git a/ompi/mca/pml/ob1/pml_ob1_recvreq.c b/ompi/mca/pml/ob1/pml_ob1_recvreq.c index ddd60f263ce..b3153f951f6 100644 --- a/ompi/mca/pml/ob1/pml_ob1_recvreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_recvreq.c @@ -143,6 +143,7 @@ static int mca_pml_ob1_recv_request_cancel(struct ompi_request_t* ompi_request, static void mca_pml_ob1_recv_request_construct(mca_pml_ob1_recv_request_t* request) { /* the request type is set by the superclass */ + request->req_recv.req_base.req_ompi.req_start = mca_pml_ob1_start; request->req_recv.req_base.req_ompi.req_free = mca_pml_ob1_recv_request_free; request->req_recv.req_base.req_ompi.req_cancel = mca_pml_ob1_recv_request_cancel; request->req_rdma_cnt = 0; diff --git a/ompi/mca/pml/ob1/pml_ob1_sendreq.c b/ompi/mca/pml/ob1/pml_ob1_sendreq.c index 96bfa16ddb5..f0a227f5dc9 100644 --- a/ompi/mca/pml/ob1/pml_ob1_sendreq.c +++ b/ompi/mca/pml/ob1/pml_ob1_sendreq.c @@ -132,6 +132,7 @@ static int mca_pml_ob1_send_request_cancel(struct ompi_request_t* request, int c static void mca_pml_ob1_send_request_construct(mca_pml_ob1_send_request_t* req) { req->req_send.req_base.req_type = MCA_PML_REQUEST_SEND; + req->req_send.req_base.req_ompi.req_start = mca_pml_ob1_start; req->req_send.req_base.req_ompi.req_free = mca_pml_ob1_send_request_free; req->req_send.req_base.req_ompi.req_cancel = mca_pml_ob1_send_request_cancel; req->req_rdma_cnt = 0; diff --git a/ompi/mca/pml/pml.h b/ompi/mca/pml/pml.h index 0b70da841b8..243b5993dda 100644 --- a/ompi/mca/pml/pml.h +++ b/ompi/mca/pml/pml.h @@ -69,6 +69,7 @@ #include "ompi/mca/mca.h" #include "mpi.h" /* needed for MPI_ANY_TAG */ #include "ompi/mca/pml/pml_constants.h" +#include "ompi/request/request.h" BEGIN_C_DECLS @@ -350,14 +351,11 @@ typedef int (*mca_pml_base_module_send_fn_t)( /** * Initiate one or more persistent requests. * - * @param count Number of requests - * @param request Array of persistent requests - * @return OMPI_SUCCESS or failure status. + * @param count (IN) Number of requests + * @param requests (IN/OUT) Array of persistent requests + * @return OMPI_SUCCESS or failure status. */ -typedef int (*mca_pml_base_module_start_fn_t)( - size_t count, - struct ompi_request_t** requests -); +typedef ompi_request_start_fn_t mca_pml_base_module_start_fn_t; /** * Probe to poll for pending recv. diff --git a/ompi/mca/pml/ucx/pml_ucx_request.c b/ompi/mca/pml/ucx/pml_ucx_request.c index 01dac786b8b..05533914a4c 100644 --- a/ompi/mca/pml/ucx/pml_ucx_request.c +++ b/ompi/mca/pml/ucx/pml_ucx_request.c @@ -136,6 +136,7 @@ static void mca_pml_ucx_request_init_common(ompi_request_t* ompi_req, OMPI_REQUEST_INIT(ompi_req, req_persistent); ompi_req->req_type = OMPI_REQUEST_PML; ompi_req->req_state = state; + ompi_req->req_start = mca_pml_ucx_start; ompi_req->req_free = req_free; ompi_req->req_cancel = req_cancel; /* This field is used to attach persistant request to a temporary req. diff --git a/ompi/mca/pml/yalla/pml_yalla_request.c b/ompi/mca/pml/yalla/pml_yalla_request.c index f75c2d9b446..a591371551a 100644 --- a/ompi/mca/pml/yalla/pml_yalla_request.c +++ b/ompi/mca/pml/yalla/pml_yalla_request.c @@ -149,6 +149,7 @@ static void init_base_req(mca_pml_yalla_base_request_t *req) { OMPI_REQUEST_INIT(&req->ompi, false); req->ompi.req_type = OMPI_REQUEST_PML; + req->ompi.req_start = mca_pml_yalla_start; req->ompi.req_cancel = NULL; req->ompi.req_complete_cb = NULL; req->ompi.req_complete_cb_data = NULL; diff --git a/ompi/mpi/c/start.c b/ompi/mpi/c/start.c index aa2c8af7b6b..3f1b3658e31 100644 --- a/ompi/mpi/c/start.c +++ b/ompi/mpi/c/start.c @@ -68,7 +68,7 @@ int MPI_Start(MPI_Request *request) case OMPI_REQUEST_PML: OPAL_CR_ENTER_LIBRARY(); - ret = MCA_PML_CALL(start(1, request)); + ret = (*request)->req_start(1, request); OPAL_CR_EXIT_LIBRARY(); return ret; diff --git a/ompi/mpi/c/startall.c b/ompi/mpi/c/startall.c index 34a3fed2364..14452f68de4 100644 --- a/ompi/mpi/c/startall.c +++ b/ompi/mpi/c/startall.c @@ -44,11 +44,11 @@ static const char FUNC_NAME[] = "MPI_Startall"; int MPI_Startall(int count, MPI_Request requests[]) { - int i; + int i, j; int ret = OMPI_SUCCESS; + ompi_request_start_fn_t start_fn = NULL; MEMCHECKER( - int j; for (j = 0; j < count; j++){ memchecker_request(&requests[j]); } @@ -76,7 +76,7 @@ int MPI_Startall(int count, MPI_Request requests[]) OPAL_CR_ENTER_LIBRARY(); - for (i = 0; i < count; ++i) { + for (i = 0, j = -1; i < count; ++i) { /* Per MPI it is invalid to start an active request */ if (OMPI_REQUEST_INACTIVE != requests[i]->req_state) { return OMPI_ERRHANDLER_INVOKE(MPI_COMM_WORLD, MPI_ERR_REQUEST, FUNC_NAME); @@ -91,9 +91,21 @@ int MPI_Startall(int count, MPI_Request requests[]) */ requests[i]->req_state = OMPI_REQUEST_ACTIVE; } + + /* Call a req_start callback function per requests which have the + * same req_start value. */ + if (requests[i]->req_start != start_fn) { + if (NULL != start_fn && i != 0) { + start_fn(i - j, requests + j); + } + start_fn = requests[i]->req_start; + j = i; + } } - ret = MCA_PML_CALL(start(count, requests)); + if (NULL != start_fn) { + start_fn(i - j, requests + j); + } OPAL_CR_EXIT_LIBRARY(); return ret; diff --git a/ompi/request/request.c b/ompi/request/request.c index 82f43209dd5..6c37008473b 100644 --- a/ompi/request/request.c +++ b/ompi/request/request.c @@ -55,6 +55,7 @@ static void ompi_request_construct(ompi_request_t* req) req->req_state = OMPI_REQUEST_INVALID; req->req_complete = false; req->req_persistent = false; + req->req_start = NULL; req->req_free = NULL; req->req_cancel = NULL; req->req_complete_cb = NULL; @@ -123,6 +124,7 @@ int ompi_request_init(void) ompi_request_null.request.req_persistent = false; ompi_request_null.request.req_f_to_c_index = opal_pointer_array_add(&ompi_request_f_to_c_table, &ompi_request_null); + ompi_request_null.request.req_start = NULL; /* should not be called */ ompi_request_null.request.req_free = ompi_request_null_free; ompi_request_null.request.req_cancel = ompi_request_null_cancel; ompi_request_null.request.req_mpi_object.comm = &ompi_mpi_comm_world.comm; @@ -155,6 +157,7 @@ int ompi_request_init(void) ompi_request_empty.req_persistent = false; ompi_request_empty.req_f_to_c_index = opal_pointer_array_add(&ompi_request_f_to_c_table, &ompi_request_empty); + ompi_request_empty.req_start = NULL; /* should not be called */ ompi_request_empty.req_free = ompi_request_empty_free; ompi_request_empty.req_cancel = ompi_request_null_cancel; ompi_request_empty.req_mpi_object.comm = &ompi_mpi_comm_world.comm; diff --git a/ompi/request/request.h b/ompi/request/request.h index 0d0843b6af6..a587950b3c3 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -55,6 +55,26 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_request_t); struct ompi_request_t; +/** + * Initiate one or more persistent requests. + * + * This function is called by MPI_START and MPI_STARTALL. + * + * When called by MPI_START, count is 1. + * + * When called by MPI_STARTALL, multiple requests which have the same + * req_start value are passed. This may help scheduling optimization + * of multiple communications. + * + * @param count (IN) Number of requests + * @param requests (IN/OUT) Array of persistent requests + * @return OMPI_SUCCESS or failure status. + */ +typedef int (*ompi_request_start_fn_t)( + size_t count, + struct ompi_request_t ** requests +); + /* * Required function to free the request and any associated resources. */ @@ -109,6 +129,7 @@ struct ompi_request_t { volatile ompi_request_state_t req_state; /**< enum indicate state of the request */ bool req_persistent; /**< flag indicating if the this is a persistent request */ int req_f_to_c_index; /**< Index in Fortran <-> C translation array */ + ompi_request_start_fn_t req_start; /**< Called by MPI_START and MPI_STARTALL */ ompi_request_free_fn_t req_free; /**< Called by free */ ompi_request_cancel_fn_t req_cancel; /**< Optional function to cancel the request */ ompi_request_complete_fn_t req_complete_cb; /**< Called when the request is MPI completed */