diff --git a/ompi/mca/rte/orte/rte_orte_module.c b/ompi/mca/rte/orte/rte_orte_module.c index c82b25ce400..ce35edfda46 100644 --- a/ompi/mca/rte/orte/rte_orte_module.c +++ b/ompi/mca/rte/orte/rte_orte_module.c @@ -113,7 +113,7 @@ void ompi_rte_wait_for_debugger(void) debugger = 1; } - if (!debugger) { + if (!debugger && NULL == getenv("ORTE_TEST_DEBUGGER_ATTACH")) { /* if not, just return */ return; } diff --git a/orte/mca/ess/base/ess_base_std_app.c b/orte/mca/ess/base/ess_base_std_app.c index 1af5429afc8..446ffd67819 100644 --- a/orte/mca/ess/base/ess_base_std_app.c +++ b/orte/mca/ess/base/ess_base_std_app.c @@ -47,9 +47,13 @@ #include "opal/runtime/opal.h" #include "opal/runtime/opal_progress_threads.h" +#include "orte/mca/rml/base/base.h" +#include "orte/mca/routed/base/base.h" #include "orte/mca/errmgr/errmgr.h" #include "orte/mca/dfs/base/base.h" #include "orte/mca/grpcomm/base/base.h" +#include "orte/mca/oob/base/base.h" +#include "orte/mca/rml/rml.h" #include "orte/mca/odls/odls_types.h" #include "orte/mca/filem/base/base.h" #include "orte/mca/errmgr/base/base.h" @@ -173,14 +177,73 @@ int orte_ess_base_app_setup(bool db_restrict_local) } OBJ_DESTRUCT(&kv); } - + /* Setup the communication infrastructure */ + /* + * OOB Layer + */ + if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_oob_base_framework, 0))) { + ORTE_ERROR_LOG(ret); + error = "orte_oob_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_oob_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_oob_base_select"; + goto error; + } + /* Runtime Messaging Layer */ + if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_rml_base_framework, 0))) { + ORTE_ERROR_LOG(ret); + error = "orte_rml_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_rml_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_rml_base_select"; + goto error; + } /* setup the errmgr */ if (ORTE_SUCCESS != (ret = orte_errmgr_base_select())) { ORTE_ERROR_LOG(ret); error = "orte_errmgr_base_select"; goto error; } - + /* Routed system */ + if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_routed_base_framework, 0))) { + ORTE_ERROR_LOG(ret); + error = "orte_routed_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_routed_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_routed_base_select"; + goto error; + } + /* + * Group communications + */ + if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_grpcomm_base_framework, 0))) { + ORTE_ERROR_LOG(ret); + error = "orte_grpcomm_base_open"; + goto error; + } + if (ORTE_SUCCESS != (ret = orte_grpcomm_base_select())) { + ORTE_ERROR_LOG(ret); + error = "orte_grpcomm_base_select"; + goto error; + } + /* enable communication via the rml */ + if (ORTE_SUCCESS != (ret = orte_rml.enable_comm())) { + ORTE_ERROR_LOG(ret); + error = "orte_rml.enable_comm"; + goto error; + } + /* setup the routed info */ + if (ORTE_SUCCESS != (ret = orte_routed.init_routes(ORTE_PROC_MY_NAME->jobid, NULL))) { + ORTE_ERROR_LOG(ret); + error = "orte_routed.init_routes"; + goto error; + } /* open the distributed file system */ if (ORTE_SUCCESS != (ret = mca_base_framework_open(&orte_dfs_base_framework, 0))) { ORTE_ERROR_LOG(ret); @@ -216,7 +279,13 @@ int orte_ess_base_app_finalize(void) (void) mca_base_framework_close(&orte_filem_base_framework); (void) mca_base_framework_close(&orte_errmgr_base_framework); + /* now can close the rml and its friendly group comm */ + (void) mca_base_framework_close(&orte_grpcomm_base_framework); (void) mca_base_framework_close(&orte_dfs_base_framework); + (void) mca_base_framework_close(&orte_routed_base_framework); + + (void) mca_base_framework_close(&orte_rml_base_framework); + (void) mca_base_framework_close(&orte_oob_base_framework); (void) mca_base_framework_close(&orte_state_base_framework); orte_session_dir_finalize(ORTE_PROC_MY_NAME); @@ -270,7 +339,7 @@ void orte_ess_base_app_abort(int status, bool report) * the message if routing is enabled as this indicates we * have someone to send to */ - if (report && orte_create_session_dirs) { + if (report && orte_routing_is_enabled && orte_create_session_dirs) { myfile = opal_os_path(false, orte_process_info.proc_session_dir, "aborted", NULL); fd = open(myfile, O_CREAT, S_IRUSR); close(fd); diff --git a/orte/mca/ess/pmi/ess_pmi_module.c b/orte/mca/ess/pmi/ess_pmi_module.c index 6723ed47d99..8b48e45952d 100644 --- a/orte/mca/ess/pmi/ess_pmi_module.c +++ b/orte/mca/ess/pmi/ess_pmi_module.c @@ -47,6 +47,8 @@ #include "opal/mca/pmix/base/base.h" #include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/grpcomm/grpcomm.h" +#include "orte/mca/rml/rml.h" #include "orte/util/proc_info.h" #include "orte/util/show_help.h" #include "orte/util/name_fns.h" @@ -83,6 +85,7 @@ static int rte_init(void) char *envar, *ev1, *ev2; uint64_t unique_key[2]; char *string_key; + char *rmluri; opal_value_t *kv; char *val; int u32, *u32ptr; @@ -358,6 +361,16 @@ static int rte_init(void) /*** PUSH DATA FOR OTHERS TO FIND ***/ + /* push our RML URI in case others need to talk directly to us */ + rmluri = orte_rml.get_contact_info(); + /* push it out for others to use */ + OPAL_MODEX_SEND_VALUE(ret, OPAL_PMIX_GLOBAL, OPAL_PMIX_PROC_URI, rmluri, OPAL_STRING); + if (ORTE_SUCCESS != ret) { + error = "pmix put uri"; + goto error; + } + free(rmluri); + /* push our hostname so others can find us, if they need to */ OPAL_MODEX_SEND_VALUE(ret, OPAL_PMIX_GLOBAL, OPAL_PMIX_HOSTNAME, orte_process_info.nodename, OPAL_STRING); if (ORTE_SUCCESS != ret) { diff --git a/orte/mca/oob/usock/Makefile.am b/orte/mca/oob/usock/Makefile.am new file mode 100644 index 00000000000..b44934e8b6a --- /dev/null +++ b/orte/mca/oob/usock/Makefile.am @@ -0,0 +1,56 @@ +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2009 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2012-2013 Los Alamos National Security, LLC. +# All rights reserved +# Copyright (c) 2013-2015 Intel, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + oob_usock_component.h \ + oob_usock.h \ + oob_usock_component.c \ + oob_usock_connection.h \ + oob_usock_sendrecv.h \ + oob_usock_hdr.h \ + oob_usock_peer.h \ + oob_usock_ping.h \ + oob_usock.c \ + oob_usock_connection.c \ + oob_usock_sendrecv.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +if MCA_BUILD_orte_oob_usock_DSO +component_noinst = +component_install = mca_oob_usock.la +else +component_noinst = libmca_oob_usock.la +component_install = +endif + +mcacomponentdir = $(ortelibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_oob_usock_la_SOURCES = $(sources) +mca_oob_usock_la_LDFLAGS = -module -avoid-version + +noinst_LTLIBRARIES = $(component_noinst) +libmca_oob_usock_la_SOURCES = $(sources) +libmca_oob_usock_la_LDFLAGS = -module -avoid-version + diff --git a/orte/mca/oob/usock/configure.m4 b/orte/mca/oob/usock/configure.m4 new file mode 100644 index 00000000000..c9a1b59f50a --- /dev/null +++ b/orte/mca/oob/usock/configure.m4 @@ -0,0 +1,42 @@ +# -*- shell-script -*- +# +# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2005 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2011-2013 Los Alamos National Security, LLC. +# All rights reserved. +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2013 Intel, Inc. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# MCA_oob_usock_CONFIG([action-if-found], [action-if-not-found]) +# ----------------------------------------------------------- +AC_DEFUN([MCA_orte_oob_usock_CONFIG],[ + AC_CONFIG_FILES([orte/mca/oob/usock/Makefile]) + + # check for sockaddr_un (a good sign we have Unix domain sockets) + AC_CHECK_TYPES([struct sockaddr_un], + [oob_usock_happy="yes"], + [oob_usock_happy="no"], + [AC_INCLUDES_DEFAULT +#ifdef HAVE_SYS_SOCKET_H +#include +#endif +#ifdef HAVE_SYS_UN_H +#include +#endif]) + + AS_IF([test "$oob_usock_happy" = "yes"], [$1], [$2]) +])dnl diff --git a/orte/mca/oob/usock/help-oob-usock.txt b/orte/mca/oob/usock/help-oob-usock.txt new file mode 100644 index 00000000000..6eb8ac0542a --- /dev/null +++ b/orte/mca/oob/usock/help-oob-usock.txt @@ -0,0 +1,70 @@ +# -*- text -*- +# +# Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana +# University Research and Technology +# Corporation. All rights reserved. +# Copyright (c) 2004-2006 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2004-2005 The Regents of the University of California. +# All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# +[static-and-dynamic] +Both static and dynamic port ranges were specified for the +out-of-band (OOB) communication subsystem: + +Static ports: %s +Dynamic ports: %s + +Only one can be specified. Please choose either static or +dynamic ports and try again. +# +[include-exclude] +Both TCP interface include and exclude lists were specified: + + Include: %s + Exclude: %s + +Only one of these can be given. +# +[not-parseable] +The specified network is not parseable. Since we cannot determine +your desired intent, we cannot establish a TCP socket for out-of-band +communications and will therefore abort. Please correct the network +specification and retry. +# +[no-included-found] +None of the networks specified to be included for out-of-band communications +could be found: + + Value given: %s + +Please revise the specification and try again. +# +[excluded-all] +The specified list of networks to be excluded for out-of-band communications +resulted in no networks being available: + + Value given: %s + +Please revise the specification and try again. +# +[no-interfaces-avail] +No network interfaces were found for out-of-band communications. We require +at least one available network for TCP-based messaging. +# +[invalid if_inexclude] +WARNING: An invalid value was given for oob_tcp_if_%s. This +value will be ignored. + + Local host: %s + Value: %s + Message: %s +# diff --git a/orte/mca/oob/usock/oob_usock.c b/orte/mca/oob/usock/oob_usock.c new file mode 100644 index 00000000000..2f88abda0ff --- /dev/null +++ b/orte/mca/oob/usock/oob_usock.c @@ -0,0 +1,484 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009-2015 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#include "orte_config.h" +#include "orte/types.h" +#include "opal/types.h" + +#ifdef HAVE_UNISTD_H +#include +#endif +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#include +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif +#ifdef HAVE_NETDB_H +#include +#endif +#include + +#include "opal/util/show_help.h" +#include "opal/util/error.h" +#include "opal/util/output.h" +#include "opal/opal_socket_errno.h" +#include "opal/util/if.h" +#include "opal/util/net.h" +#include "opal/util/argv.h" +#include "opal/class/opal_hash_table.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/ess/ess.h" +#include "orte/util/name_fns.h" +#include "orte/util/parse_options.h" +#include "orte/util/show_help.h" +#include "orte/runtime/orte_globals.h" + +#include "orte/mca/oob/usock/oob_usock.h" +#include "orte/mca/oob/usock/oob_usock_component.h" +#include "orte/mca/oob/usock/oob_usock_peer.h" +#include "orte/mca/oob/usock/oob_usock_connection.h" +#include "orte/mca/oob/usock/oob_usock_ping.h" + +static void usock_init(void); +static void usock_fini(void); +static void accept_connection(const int accepted_fd, + const struct sockaddr *addr); +static void ping(const orte_process_name_t *proc); +static void send_nb(orte_rml_send_t *msg); +static void ft_event(int state); + +mca_oob_usock_module_t mca_oob_usock_module = { + { + usock_init, + usock_fini, + accept_connection, + ping, + send_nb, + ft_event + } +}; + +/* + * Local utility functions + */ +static void recv_handler(int sd, short flags, void* user); +static void* progress_thread_engine(opal_object_t *obj) +{ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s USOCK PROGRESS THREAD RUNNING", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + while (mca_oob_usock_module.ev_active) { + opal_event_loop(mca_oob_usock_module.ev_base, OPAL_EVLOOP_ONCE); + } + return OPAL_THREAD_CANCELLED; +} + + +/* + * Initialize global variables used w/in this module. + */ +static void usock_init(void) +{ + /* setup the module's state variables */ + OBJ_CONSTRUCT(&mca_oob_usock_module.peers, opal_hash_table_t); + opal_hash_table_init(&mca_oob_usock_module.peers, 32); + mca_oob_usock_module.ev_active = false; + + if (orte_oob_base.use_module_threads) { + /* if we are to use independent progress threads at + * the module level, start it now + */ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s STARTING USOCK PROGRESS THREAD", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + mca_oob_usock_module.ev_base = opal_event_base_create(); + /* construct the thread object */ + OBJ_CONSTRUCT(&mca_oob_usock_module.progress_thread, opal_thread_t); + /* fork off a thread to progress it */ + mca_oob_usock_module.progress_thread.t_run = progress_thread_engine; + mca_oob_usock_module.progress_thread.t_arg = NULL; + mca_oob_usock_module.ev_active = true; + if (OPAL_SUCCESS != opal_thread_start(&mca_oob_usock_module.progress_thread)) { + opal_output(0, "%s USOCK progress thread failed to start", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + } + } else { + mca_oob_usock_module.ev_base = orte_event_base; + } +} + +/* + * Module cleanup. + */ +static void usock_fini(void) +{ + /* cleanup all peers */ + OBJ_DESTRUCT(&mca_oob_usock_module.peers); + + if (mca_oob_usock_module.ev_active) { + /* if we used an independent progress thread at + * the module level, stop it now + */ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s STOPPING USOCK PROGRESS THREAD", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + /* stop the progress thread */ + mca_oob_usock_module.ev_active = false; + /* break the event loop */ + opal_event_base_loopexit(mca_oob_usock_module.ev_base); + /* wait for thread to exit */ + opal_thread_join(&mca_oob_usock_module.progress_thread, NULL); + OBJ_DESTRUCT(&mca_oob_usock_module.progress_thread); + /* release the event base */ + opal_event_base_free(mca_oob_usock_module.ev_base); + } +} + +/* Called by mca_oob_usock_accept() and connection_handler() on + * a socket that has been accepted. This call finishes processing the + * socket by registering for the OOB-level connection handshake. Used + * in both the threaded and event listen modes. + */ +static void accept_connection(const int accepted_fd, + const struct sockaddr *addr) +{ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s accept_connection", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + /* use a one-time event to wait for receipt of peer's + * process ident message to complete this connection + */ + ORTE_ACTIVATE_USOCK_ACCEPT_STATE(accepted_fd, addr, recv_handler); +} + +/* API functions */ +static void process_ping(int fd, short args, void *cbdata) +{ + mca_oob_usock_ping_t *op = (mca_oob_usock_ping_t*)cbdata; + mca_oob_usock_peer_t *peer; + + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s:[%s:%d] processing ping to peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + __FILE__, __LINE__, + ORTE_NAME_PRINT(&op->peer)); + + /* do we know this peer? */ + if (NULL == (peer = mca_oob_usock_peer_lookup(&op->peer))) { + /* push this back to the framework so another component can try */ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s:[%s:%d] hop %s unknown", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + __FILE__, __LINE__, + ORTE_NAME_PRINT(&op->peer)); +#if 0 + ORTE_ACTIVATE_USOCK_MSG_ERROR(NULL, NULL, &op->peer, mca_oob_usock_component_hop_unknown); +#endif + goto cleanup; + } + + /* if we are already connected, there is nothing to do */ + if (MCA_OOB_USOCK_CONNECTED == peer->state) { + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s:[%s:%d] already connected to peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + __FILE__, __LINE__, + ORTE_NAME_PRINT(&op->peer)); + goto cleanup; + } + + /* if we are already connecting, there is nothing to do */ + if (MCA_OOB_USOCK_CONNECTING == peer->state || + MCA_OOB_USOCK_CONNECT_ACK == peer->state) { + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s:[%s:%d] already connecting to peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + __FILE__, __LINE__, + ORTE_NAME_PRINT(&op->peer)); + goto cleanup; + } + + /* attempt the connection */ + peer->state = MCA_OOB_USOCK_CONNECTING; + ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect); + + cleanup: + OBJ_RELEASE(op); +} + +static void ping(const orte_process_name_t *proc) +{ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s:[%s:%d] pinging peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + __FILE__, __LINE__, + ORTE_NAME_PRINT(proc)); + + /* push this into our event base for processing */ + ORTE_ACTIVATE_USOCK_PING(proc, process_ping); +} + +static void process_send(int fd, short args, void *cbdata) +{ + mca_oob_usock_msg_op_t *op = (mca_oob_usock_msg_op_t*)cbdata; + mca_oob_usock_peer_t *peer; + struct timeval tv; + + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s:[%s:%d] processing send to peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + __FILE__, __LINE__, + ORTE_NAME_PRINT(&op->msg->dst)); + + /* if I am an app, the only route is to my daemon, so + * send the msg there + */ + if (ORTE_PROC_IS_APP) { + if (NULL == (peer = mca_oob_usock_peer_lookup(ORTE_PROC_MY_DAEMON))) { + /* we don't know how to talk to our daemon, + * which is strange since we already got here. + * likely means we lost a race condition, so + * + */ + ORTE_ACTIVATE_USOCK_MSG_ERROR(NULL, op->msg, + ORTE_PROC_MY_DAEMON, + mca_oob_usock_component_cannot_send); + goto cleanup; + } + } else if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { + /* if I am a daemon, the only way I should be given this + * message to send is if the proc is local to me + */ + if (NULL == (peer = mca_oob_usock_peer_lookup(&op->msg->dst))) { + /* try this again after a delay for N times */ + op->reps++; + if (20 < op->reps) { + /* we don't know how to talk to this proc, + * so send this back up to the OOB base so it + * can try another transport + */ + ORTE_ACTIVATE_USOCK_MSG_ERROR(NULL, op->msg, + &op->msg->dst, + mca_oob_usock_component_cannot_send); + goto cleanup; + } + opal_event_evtimer_set(orte_event_base, &op->ev, process_send, op); + opal_event_set_priority(&op->ev, ORTE_ERROR_PRI); + tv.tv_sec = 1; + tv.tv_usec = 0; + opal_event_evtimer_add(&op->ev, &tv); + return; + } + } else { + /* otherwise, this message can't be handled by me, so + * notify the component of the mistake + */ + opal_output(0, "CAN'T BE HANDLED"); + goto cleanup; + } + + /* add the msg to the target's send queue */ + if (MCA_OOB_USOCK_CONNECTED == peer->state) { + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s usock:send_nb: already connected to %s - queueing for send", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + MCA_OOB_USOCK_QUEUE_SEND(op->msg, peer); + goto cleanup; + } + + /* add the message to the queue for sending after the + * connection is formed + */ + MCA_OOB_USOCK_QUEUE_PENDING(op->msg, peer); + + if (MCA_OOB_USOCK_CONNECTING != peer->state && + MCA_OOB_USOCK_CONNECT_ACK != peer->state) { + /* we have to initiate the connection - again, we do not + * want to block while the connection is created. + * So throw us into an event that will create + * the connection via a mini-state-machine :-) + */ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s usock:send_nb: initiating connection to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + peer->state = MCA_OOB_USOCK_CONNECTING; + ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect); + } + + cleanup: + OBJ_RELEASE(op); +} + +static void send_nb(orte_rml_send_t *msg) +{ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s usock:send_nb to peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&msg->dst)); + + /* push this into our event base for processing */ + ORTE_ACTIVATE_USOCK_POST_SEND(msg, process_send); +} + +/* + * Event callback when there is data available on the registered + * socket to recv. This is called for the listen sockets to accept an + * incoming connection, on new sockets trying to complete the software + * connection process, and for probes. Data on an established + * connection is handled elsewhere. + */ +static void recv_handler(int sd, short flags, void *cbdata) +{ + mca_oob_usock_conn_op_t *op = (mca_oob_usock_conn_op_t*)cbdata; + mca_oob_usock_hdr_t hdr; + mca_oob_usock_peer_t *peer; + uint64_t *ui64; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:recv:handler called", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + /* get the handshake */ + if (ORTE_SUCCESS != mca_oob_usock_peer_recv_connect_ack(NULL, sd, &hdr)) { + goto cleanup; + } + + /* finish processing ident */ + if (MCA_OOB_USOCK_IDENT == hdr.type) { + if (NULL == (peer = mca_oob_usock_peer_lookup(&hdr.origin))) { + /* should never happen */ + goto cleanup; + } + /* set socket up to be non-blocking */ + if ((flags = fcntl(sd, F_GETFL, 0)) < 0) { + opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_GETFL) failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); + } else { + flags |= O_NONBLOCK; + if (fcntl(sd, F_SETFL, flags) < 0) { + opal_output(0, "%s mca_oob_usock_recv_connect: fcntl(F_SETFL) failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), strerror(opal_socket_errno), opal_socket_errno); + } + } + + /* is the peer instance willing to accept this connection */ + peer->sd = sd; + if (mca_oob_usock_peer_accept(peer) == false) { + if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { + opal_output(0, "%s-%s mca_oob_usock_recv_connect: " + "rejected connection state %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + peer->state); + } + CLOSE_THE_SOCKET(sd); + ui64 = (uint64_t*)(&peer->name); + opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), NULL); + OBJ_RELEASE(peer); + } + } + + cleanup: + OBJ_RELEASE(op); +} + +/* Dummy function for when we are not using FT. */ +#if OPAL_ENABLE_FT_CR == 0 +static void ft_event(int state) +{ + return; +} + +#else +static void ft_event(int state) { +#if 0 + opal_list_item_t *item; +#endif + + if(OPAL_CRS_CHECKPOINT == state) { +#if 0 + /* + * Disable event processing while we are working + */ + opal_event_disable(); +#endif + } + else if(OPAL_CRS_CONTINUE == state) { +#if 0 + /* + * Resume event processing + */ + opal_event_enable(); + } + else if(OPAL_CRS_RESTART == state) { + /* + * Clean out cached connection information + * Select pieces of finalize/init + */ + for (item = opal_list_remove_first(&mod->peer_list); + item != NULL; + item = opal_list_remove_first(&mod->peer_list)) { + mca_oob_usock_peer_t* peer = (mca_oob_usock_peer_t*)item; + /* JJH: Use the below command for debugging restarts with invalid sockets + * mca_oob_usock_peer_dump(peer, "RESTART CLEAN") + */ + MCA_OOB_USOCK_PEER_RETURN(peer); + } + + OBJ_DESTRUCT(&mod->peer_free); + OBJ_DESTRUCT(&mod->peer_names); + OBJ_DESTRUCT(&mod->peers); + OBJ_DESTRUCT(&mod->peer_list); + + OBJ_CONSTRUCT(&mod->peer_list, opal_list_t); + OBJ_CONSTRUCT(&mod->peers, opal_hash_table_t); + OBJ_CONSTRUCT(&mod->peer_names, opal_hash_table_t); + OBJ_CONSTRUCT(&mod->peer_free, opal_free_list_t); + + /* + * Resume event processing + */ + opal_event_enable(); +#endif + } + else if(OPAL_CRS_TERM == state ) { + ; + } + else { + ; + } + + return; +} +#endif diff --git a/orte/mca/oob/usock/oob_usock.h b/orte/mca/oob/usock/oob_usock.h new file mode 100644 index 00000000000..f6fcbc56808 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock.h @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _MCA_OOB_USOCK_H_ +#define _MCA_OOB_USOCK_H_ + +#include "orte_config.h" + +#include "orte/types.h" + +#include "opal/mca/base/base.h" +#include "opal/class/opal_free_list.h" +#include "opal/class/opal_hash_table.h" +#include "opal/mca/event/event.h" + +#include "orte/mca/oob/oob.h" +#include "orte/mca/oob/base/base.h" + + +BEGIN_C_DECLS + +/* define some debug levels */ +#define OOB_USOCK_DEBUG_FAIL 2 +#define OOB_USOCK_DEBUG_CONNECT 7 + +/* forward declare a couple of structures */ +struct mca_oob_usock_module_t; +struct mca_oob_usock_msg_error_t; + +/* Module definition */ +typedef void (*mca_oob_usock_module_init_fn_t)(void); +typedef void (*mca_oob_usock_module_fini_fn_t)(void); +typedef void (*mca_oob_usock_module_accept_connection_fn_t)(const int accepted_fd, + const struct sockaddr *addr); +typedef void (*mca_oob_usock_module_ping_fn_t)(const orte_process_name_t *proc); +typedef void (*mca_oob_usock_module_send_nb_fn_t)(orte_rml_send_t *msg); +typedef void (*mca_oob_usock_module_ft_event_fn_t)(int state); + +typedef struct { + mca_oob_usock_module_init_fn_t init; + mca_oob_usock_module_fini_fn_t finalize; + mca_oob_usock_module_accept_connection_fn_t accept_connection; + mca_oob_usock_module_ping_fn_t ping; + mca_oob_usock_module_send_nb_fn_t send_nb; + mca_oob_usock_module_ft_event_fn_t ft_event; +} mca_oob_usock_module_api_t; +typedef struct { + mca_oob_usock_module_api_t api; + opal_event_base_t *ev_base; /* event base for the module progress thread */ + bool ev_active; + opal_thread_t progress_thread; + opal_hash_table_t peers; // peer connection info +} mca_oob_usock_module_t; +ORTE_MODULE_DECLSPEC extern mca_oob_usock_module_t mca_oob_usock_module; + +/** + * the state of the connection + */ +typedef enum { + MCA_OOB_USOCK_UNCONNECTED, + MCA_OOB_USOCK_CLOSED, + MCA_OOB_USOCK_RESOLVE, + MCA_OOB_USOCK_CONNECTING, + MCA_OOB_USOCK_CONNECT_ACK, + MCA_OOB_USOCK_CONNECTED, + MCA_OOB_USOCK_FAILED, + MCA_OOB_USOCK_ACCEPTING +} mca_oob_usock_state_t; + +/* module-level shared functions */ +ORTE_MODULE_DECLSPEC void mca_oob_usock_send_handler(int fd, short args, void *cbdata); +ORTE_MODULE_DECLSPEC void mca_oob_usock_recv_handler(int fd, short args, void *cbdata); + + +END_C_DECLS + +#endif /* MCA_OOB_USOCK_H_ */ + diff --git a/orte/mca/oob/usock/oob_usock_component.c b/orte/mca/oob/usock/oob_usock_component.c new file mode 100644 index 00000000000..16e408db4b4 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_component.c @@ -0,0 +1,595 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2015 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009-2013 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * In windows, many of the socket functions return an EWOULDBLOCK + * instead of things like EAGAIN, EINPROGRESS, etc. It has been + * verified that this will not conflict with other error codes that + * are returned by these functions under UNIX/Linux environments + */ + +#include "orte_config.h" +#include "orte/types.h" +#include "opal/types.h" + +#ifdef HAVE_UNISTD_H +#include +#endif +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#include +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif +#ifdef HAVE_NETDB_H +#include +#endif +#include + +#include "opal/util/show_help.h" +#include "opal/util/error.h" +#include "opal/util/os_path.h" +#include "opal/util/output.h" +#include "opal/opal_socket_errno.h" +#include "opal/util/if.h" +#include "opal/util/net.h" +#include "opal/util/argv.h" +#include "opal/class/opal_hash_table.h" +#include "opal/class/opal_list.h" + +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/ess/ess.h" +#include "orte/mca/state/state.h" +#include "orte/util/listener.h" +#include "orte/util/name_fns.h" +#include "orte/util/parse_options.h" +#include "orte/util/session_dir.h" +#include "orte/util/show_help.h" +#include "orte/runtime/orte_globals.h" + +#include "orte/mca/oob/usock/oob_usock.h" +#include "orte/mca/oob/usock/oob_usock_component.h" +#include "orte/mca/oob/usock/oob_usock_peer.h" +#include "orte/mca/oob/usock/oob_usock_connection.h" +#include "orte/mca/oob/usock/oob_usock_ping.h" +/* + * Local utility functions + */ + +static int usock_component_register(void); +static int usock_component_open(void); +static int usock_component_close(void); + +static int component_available(void); +static int component_startup(void); +static void component_shutdown(void); +static int component_send(orte_rml_send_t *msg); +static char* component_get_addr(void); +static int component_set_addr(orte_process_name_t *peer, + char **uris); +static bool component_is_reachable(orte_process_name_t *peer); + +/* + * Struct of function pointers and all that to let us be initialized + */ +mca_oob_usock_component_t mca_oob_usock_component = { + { + .oob_base = { + MCA_OOB_BASE_VERSION_2_0_0, + .mca_component_name = "usock", + MCA_BASE_MAKE_VERSION(component, ORTE_MAJOR_VERSION, ORTE_MINOR_VERSION, + ORTE_RELEASE_VERSION), + .mca_open_component = usock_component_open, + .mca_close_component = usock_component_close, + .mca_register_component_params = usock_component_register, + }, + .oob_data = { + /* The component is checkpoint ready */ + MCA_BASE_METADATA_PARAM_CHECKPOINT + }, + .priority = 100, + .available = component_available, + .startup = component_startup, + .shutdown = component_shutdown, + .send_nb = component_send, + .get_addr = component_get_addr, + .set_addr = component_set_addr, + .is_reachable = component_is_reachable, + }, +}; + +/* + * Initialize global variables used w/in this module. + */ +static int usock_component_open(void) +{ + return ORTE_SUCCESS; +} + +/* + * Cleanup of global variables used by this module. + */ +static int usock_component_close(void) +{ + return ORTE_SUCCESS; +} + + +static int usock_component_register(void) +{ + mca_base_component_t *component = &mca_oob_usock_component.super.oob_base; + + /* register oob module parameters */ + mca_oob_usock_component.max_retries = 2; + (void)mca_base_component_var_register(component, "peer_retries", + "Number of times to try shutting down a connection before giving up", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_LOCAL, + &mca_oob_usock_component.max_retries); + + return ORTE_SUCCESS; +} + + +static int component_available(void) +{ + opal_output_verbose(5, orte_oob_base_framework.framework_output, + "oob:usock: component_available called"); + + /* if session directories were forbidden, then we cannot be used */ + if (!orte_create_session_dirs || + NULL == orte_process_info.tmpdir_base || + NULL == orte_process_info.top_session_dir) { + return ORTE_ERR_NOT_SUPPORTED; + } + + /* this component is not available to tools */ + if (ORTE_PROC_IS_TOOL) { + return ORTE_ERR_NOT_AVAILABLE; + } + + if (ORTE_PROC_IS_APP) { + if (NULL == orte_process_info.my_daemon_uri) { + /* direct-launched apps cannot use it */ + return ORTE_ERR_NOT_AVAILABLE; + } + } + + /* otherwise, we are available */ + return ORTE_SUCCESS; +} + +/* + * Handler for accepting connections from the event library + */ +static void connection_event_handler(int incoming_sd, short flags, void* cbdata) +{ + orte_pending_connection_t *pending = (orte_pending_connection_t*)cbdata; + int sd; + + sd = pending->fd; + pending->fd = -1; + OBJ_RELEASE(pending); + + /* process the connection */ + mca_oob_usock_module.api.accept_connection(sd, NULL); +} + +/* Start the module */ +static int component_startup(void) +{ + int rc=ORTE_SUCCESS; + + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s USOCK STARTUP", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + /* setup the path to the daemon rendezvous point */ + memset(&mca_oob_usock_component.address, 0, sizeof(struct sockaddr_un)); + mca_oob_usock_component.address.sun_family = AF_UNIX; + snprintf(mca_oob_usock_component.address.sun_path, + sizeof(mca_oob_usock_component.address.sun_path)-1, + "%s/%s/%s/0/%s", orte_process_info.tmpdir_base, + orte_process_info.top_session_dir, + ORTE_JOB_FAMILY_PRINT(ORTE_PROC_MY_NAME->jobid), "usock"); + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "SUNPATH: %s", mca_oob_usock_component.address.sun_path); + + /* if we are a daemon/HNP, register our listener */ + if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { + if (ORTE_SUCCESS != (rc = orte_register_listener((struct sockaddr*)&mca_oob_usock_component.address, sizeof(struct sockaddr_un), + orte_event_base, connection_event_handler))) { + ORTE_ERROR_LOG(rc); + } + } else { + /* if the rendezvous point isn't there, then that's an error */ + /* if the rendezvous file doesn't exist, that's an error */ + if (0 != access(mca_oob_usock_component.address.sun_path, R_OK)) { + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "SUNPATH: %s NOT READABLE", mca_oob_usock_component.address.sun_path); + return OPAL_ERR_NOT_FOUND; + } + } + + /* start the module */ + mca_oob_usock_module.api.init(); + + return rc; +} + +static void component_shutdown(void) +{ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s USOCK SHUTDOWN", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { + /* delete the rendezvous file */ + unlink(mca_oob_usock_component.address.sun_path); + } + + /* shutdown the module */ + if (NULL != mca_oob_usock_module.api.finalize) { + mca_oob_usock_module.api.finalize(); + } +} + +static int component_send(orte_rml_send_t *msg) +{ + orte_proc_t *proc; + + opal_output_verbose(5, orte_oob_base_framework.framework_output, + "%s oob:usock:send_nb to peer %s:%d ", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&msg->dst), msg->tag); + + if (ORTE_PROC_IS_DAEMON || ORTE_PROC_IS_HNP) { + /* daemons can only reach local procs */ + if (NULL == (proc = orte_get_proc_object(&msg->dst))) { + return ORTE_ERR_TAKE_NEXT_OPTION; + } + if (!ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { + return ORTE_ERR_TAKE_NEXT_OPTION; + } + } + + /* apps can reach anyone via this module as the daemon + * will route the message to the final destination + */ + + mca_oob_usock_module.api.send_nb(msg); + return ORTE_SUCCESS; +} + +/* although we do not use the uri to determine a peer's + * address (since we know the path via the session directory), + * we have to provide something to the uri. This is needed + * as other places in ORTE use a NULL uri to indicate lack + * of a daemon. We may eventually remove that dependency, + * but for now, just ensure that the uri is never NULL, + * even if we are the only active OOB transport. + */ +static char* component_get_addr(void) +{ + char *tmp; + tmp = strdup("usock"); + return tmp; +} + +static int component_set_addr(orte_process_name_t *peer, + char **uris) +{ + orte_proc_t *proc; + mca_oob_usock_peer_t *pr; + uint64_t *ui64; + + /* if I am an application, then everything is addressable + * by me via my daemon + */ + if (ORTE_PROC_IS_APP) { + /* if this is my daemon, then take it - otherwise, ignore */ + if (ORTE_PROC_MY_DAEMON->jobid == peer->jobid && + ORTE_PROC_MY_DAEMON->vpid == peer->vpid) { + ui64 = (uint64_t*)peer; + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers, + (*ui64), (void**)&pr) || NULL == pr) { + pr = OBJ_NEW(mca_oob_usock_peer_t); + pr->name = *peer; + opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), pr); + } + /* we have to initiate the connection because otherwise the + * daemon has no way to communicate to us via this component + * as the app doesn't have a listening port */ + pr->state = MCA_OOB_USOCK_CONNECTING; + ORTE_ACTIVATE_USOCK_CONN_STATE(pr, mca_oob_usock_peer_try_connect); + return ORTE_SUCCESS; + } + /* otherwise, indicate that we cannot reach this peer */ + return ORTE_ERR_TAKE_NEXT_OPTION; + } + + /* if I am a daemon or HNP, I can only reach my + * own local procs via this component + */ + if (ORTE_PROC_MY_NAME->jobid == peer->jobid) { + /* another daemon */ + return ORTE_ERR_TAKE_NEXT_OPTION; + } + if (NULL == (proc = orte_get_proc_object(peer)) || + !ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { + return ORTE_ERR_TAKE_NEXT_OPTION; + } + /* indicate that this peer is addressable by this component */ + ui64 = (uint64_t*)peer; + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers, + (*ui64), (void**)&pr) || NULL == pr) { + pr = OBJ_NEW(mca_oob_usock_peer_t); + pr->name = *peer; + opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), pr); + } + return ORTE_SUCCESS; +} + +void mca_oob_usock_component_set_module(int fd, short args, void *cbdata) +{ + mca_oob_usock_peer_op_t *pop = (mca_oob_usock_peer_op_t*)cbdata; + uint64_t ui64; + int rc; + orte_oob_base_peer_t *bpr; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:set_module called for peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&pop->peer->name)); + + /* retrieve the peer's name */ + memcpy(&ui64, (char*)&(pop->peer->name), sizeof(uint64_t)); + + /* make sure the OOB knows that we are handling this peer - we + * are in the same event base as the OOB base, so we can + * directly access its storage + */ + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers, + ui64, (void**)&bpr) || NULL == bpr) { + bpr = OBJ_NEW(orte_oob_base_peer_t); + } + opal_bitmap_set_bit(&bpr->addressable, mca_oob_usock_component.super.idx); + bpr->component = &mca_oob_usock_component.super; + if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers, + ui64, bpr))) { + ORTE_ERROR_LOG(rc); + } + + OBJ_RELEASE(pop); +} + +void mca_oob_usock_component_lost_connection(int fd, short args, void *cbdata) +{ + mca_oob_usock_peer_op_t *pop = (mca_oob_usock_peer_op_t*)cbdata; + uint64_t ui64; + int rc; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:lost connection called for peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&pop->peer->name)); + + /* retrieve the peer's name */ + memcpy(&ui64, (char*)&(pop->peer->name), sizeof(uint64_t)); + + /* mark the OOB's table that we can't reach it any more - for now, we don't + * worry about shifting to another component. Eventually, we will want to push + * this decision to the OOB so it can try other components and eventually error out + */ + if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers, + ui64, NULL))) { + ORTE_ERROR_LOG(rc); + } + + /* activate the proc state - since an app only connects to its parent daemon, + * and the daemon is *always* its lifeline, activate the lifeline lost state */ + if (ORTE_PROC_IS_APP) { + ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_LIFELINE_LOST); + } else { + /* we are the daemon end, so notify that the child's comm failed */ + ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_COMM_FAILED); + } + + OBJ_RELEASE(pop); +} + +void mca_oob_usock_component_cannot_send(int fd, short args, void *cbdata) +{ + mca_oob_usock_msg_error_t *pop = (mca_oob_usock_msg_error_t*)cbdata; + uint64_t ui64; + int rc; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:unable to send to peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&pop->hop)); + + /* retrieve the peer's name */ + memcpy(&ui64, (char*)&(pop->hop), sizeof(uint64_t)); + + /* mark the OOB's table that we can't reach it any more - for now, we don't + * worry about shifting to another component. Eventually, we will want to push + * this decision to the OOB so it can try other components and eventually error out + */ + if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers, + ui64, NULL))) { + ORTE_ERROR_LOG(rc); + } + + /* have the OOB base try to send it again */ + ORTE_OOB_SEND(pop->rmsg); + + OBJ_RELEASE(pop); +} + +void mca_oob_usock_component_failed_to_connect(int fd, short args, void *cbdata) +{ + mca_oob_usock_peer_op_t *pop = (mca_oob_usock_peer_op_t*)cbdata; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:failed_to_connect called for peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&pop->peer->name)); + + /* if we are terminating, then don't do anything further */ + if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) { + OBJ_RELEASE(pop); + return; + } + + /* activate the proc state */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:failed_to_connect unable to reach peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&pop->peer->name)); + + /* since an app only connects to its parent daemon, + * and the daemon is *always* its lifeline, activate the lifeline lost state */ + if (ORTE_PROC_IS_APP) { + ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_LIFELINE_LOST); + } else { + /* we are the daemon end, so notify that the child's comm failed */ + ORTE_ACTIVATE_PROC_STATE(&pop->peer->name, ORTE_PROC_STATE_COMM_FAILED); + } + OBJ_RELEASE(pop); +} + +static bool component_is_reachable(orte_process_name_t *peer) +{ + orte_proc_t *proc; + + /* if I am an application, then everything is reachable + * by me via my daemon + */ + if (ORTE_PROC_IS_APP) { + return true; + } + + /* if I am a daemon or HNP, I can only reach my + * own local procs via this component + */ + if (ORTE_PROC_MY_NAME->jobid == peer->jobid) { + /* another daemon */ + return false; + } + if (NULL == (proc = orte_get_proc_object(peer)) || + !ORTE_FLAG_TEST(proc, ORTE_PROC_FLAG_LOCAL)) { + return false; + } + /* indicate that this peer is reachable by this component */ + return true; +} + +char* mca_oob_usock_state_print(mca_oob_usock_state_t state) +{ + switch (state) { + case MCA_OOB_USOCK_UNCONNECTED: + return "UNCONNECTED"; + case MCA_OOB_USOCK_CLOSED: + return "CLOSED"; + case MCA_OOB_USOCK_RESOLVE: + return "RESOLVE"; + case MCA_OOB_USOCK_CONNECTING: + return "CONNECTING"; + case MCA_OOB_USOCK_CONNECT_ACK: + return "ACK"; + case MCA_OOB_USOCK_CONNECTED: + return "CONNECTED"; + case MCA_OOB_USOCK_FAILED: + return "FAILED"; + default: + return "UNKNOWN"; + } +} + + +mca_oob_usock_peer_t* mca_oob_usock_peer_lookup(const orte_process_name_t *name) +{ + mca_oob_usock_peer_t *peer; + uint64_t ui64; + + memcpy(&ui64, (char*)name, sizeof(uint64_t)); + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers, ui64, (void**)&peer)) { + return NULL; + } + return peer; +} + +/* OOB USOCK Class instances */ + +static void peer_cons(mca_oob_usock_peer_t *peer) +{ + peer->auth_method = NULL; + peer->sd = -1; + peer->state = MCA_OOB_USOCK_UNCONNECTED; + peer->retries = 0; + OBJ_CONSTRUCT(&peer->send_queue, opal_list_t); + peer->send_msg = NULL; + peer->recv_msg = NULL; + peer->send_ev_active = false; + peer->recv_ev_active = false; + peer->timer_ev_active = false; +} +static void peer_des(mca_oob_usock_peer_t *peer) +{ + if (NULL != peer->auth_method) { + free(peer->auth_method); + } + if (0 <= peer->sd) { + CLOSE_THE_SOCKET(peer->sd); + } + OPAL_LIST_DESTRUCT(&peer->send_queue); +} +OBJ_CLASS_INSTANCE(mca_oob_usock_peer_t, + opal_list_item_t, + peer_cons, peer_des); + +OBJ_CLASS_INSTANCE(mca_oob_usock_peer_op_t, + opal_object_t, + NULL, NULL); + +static void mopcon(mca_oob_usock_msg_op_t *p) +{ + p->reps = 0; +} +OBJ_CLASS_INSTANCE(mca_oob_usock_msg_op_t, + opal_object_t, + mopcon, NULL); + +OBJ_CLASS_INSTANCE(mca_oob_usock_conn_op_t, + opal_object_t, + NULL, NULL); + +OBJ_CLASS_INSTANCE(mca_oob_usock_ping_t, + opal_object_t, + NULL, NULL); + diff --git a/orte/mca/oob/usock/oob_usock_component.h b/orte/mca/oob/usock/oob_usock_component.h new file mode 100644 index 00000000000..a0bc004e793 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_component.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013-2014 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _MCA_OOB_USOCK_COMPONENT_H_ +#define _MCA_OOB_USOCK_COMPONENT_H_ + +#include "orte_config.h" + +#ifdef HAVE_SYS_TIME_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif +#ifdef HAVE_SYS_UN_H +#include +#endif + +#include "opal/class/opal_bitmap.h" +#include "opal/class/opal_list.h" +#include "opal/class/opal_pointer_array.h" + +#include "orte/mca/oob/oob.h" +#include "oob_usock_peer.h" +#include "oob_usock.h" + +/** + * OOB USOCK Component + */ +typedef struct { + mca_oob_base_component_t super; /**< base OOB component */ + int max_retries; /**< max number of retries before declaring peer gone */ + struct sockaddr_un address; /**< address of our rendezvous point */ +} mca_oob_usock_component_t; + +ORTE_MODULE_DECLSPEC extern mca_oob_usock_component_t mca_oob_usock_component; + +ORTE_MODULE_DECLSPEC char* mca_oob_usock_state_print(mca_oob_usock_state_t state); +ORTE_MODULE_DECLSPEC void mca_oob_usock_component_set_module(int fd, short args, void *cbdata); +ORTE_MODULE_DECLSPEC void mca_oob_usock_component_lost_connection(int fd, short args, void *cbdata); +ORTE_MODULE_DECLSPEC void mca_oob_usock_component_failed_to_connect(int fd, short args, void *cbdata); +ORTE_MODULE_DECLSPEC mca_oob_usock_peer_t* mca_oob_usock_peer_lookup(const orte_process_name_t *name); +ORTE_MODULE_DECLSPEC void mca_oob_usock_component_cannot_send(int fd, short args, void *cbdata); + +#endif /* _MCA_OOB_USOCK_COMPONENT_H_ */ diff --git a/orte/mca/oob/usock/oob_usock_connection.c b/orte/mca/oob/usock/oob_usock_connection.c new file mode 100644 index 00000000000..0fad7afd657 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_connection.c @@ -0,0 +1,964 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2014 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "orte_config.h" + +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#ifdef HAVE_SYS_UIO_H +#include +#endif +#ifdef HAVE_NET_UIO_H +#include +#endif +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#include "opal/opal_socket_errno.h" +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif +#ifdef HAVE_NETINET_TCP_H +#include +#endif + +#include "opal/types.h" +#include "opal_stdint.h" +#include "opal/mca/backtrace/backtrace.h" +#include "opal/mca/base/mca_base_var.h" +#include "opal/mca/sec/sec.h" +#include "opal/util/output.h" +#include "opal/util/net.h" +#include "opal/util/error.h" +#include "opal/util/fd.h" +#include "opal/class/opal_hash_table.h" +#include "opal/mca/event/event.h" + +#include "orte/util/name_fns.h" +#include "orte/mca/state/state.h" +#include "orte/runtime/orte_globals.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/ess/ess.h" +#include "orte/runtime/orte_wait.h" + +#include "oob_usock.h" +#include "orte/mca/oob/usock/oob_usock_component.h" +#include "orte/mca/oob/usock/oob_usock_peer.h" +#include "orte/mca/oob/usock/oob_usock_connection.h" + +static void usock_peer_event_init(mca_oob_usock_peer_t* peer); +static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer); +static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer, + int sd, void* data, size_t size); +static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer, + int sd, void* data, size_t size); +static void usock_peer_connected(mca_oob_usock_peer_t* peer); + +static int usock_peer_create_socket(mca_oob_usock_peer_t* peer) +{ + int flags; + + if (peer->sd > 0) { + return ORTE_SUCCESS; + } + + OPAL_OUTPUT_VERBOSE((1, orte_oob_base_framework.framework_output, + "%s oob:usock:peer creating socket to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)))); + peer->sd = socket(PF_UNIX, SOCK_STREAM, 0); + + if (peer->sd < 0) { + opal_output(0, "%s-%s usock_peer_create_socket: socket() failed: %s (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + strerror(opal_socket_errno), + opal_socket_errno); + return ORTE_ERR_UNREACH; + } + /* Set this fd to be close-on-exec so that subsequent children don't see it */ + if (opal_fd_set_cloexec(peer->sd) != OPAL_SUCCESS) { + opal_output(0, "%s unable to set socket to CLOEXEC", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + close(peer->sd); + peer->sd = -1; + return ORTE_ERROR; + } + + /* setup event callbacks */ + usock_peer_event_init(peer); + + /* setup the socket as non-blocking */ + if (peer->sd >= 0) { + if ((flags = fcntl(peer->sd, F_GETFL, 0)) < 0) { + opal_output(0, "%s-%s usock_peer_connect: fcntl(F_GETFL) failed: %s (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + strerror(opal_socket_errno), + opal_socket_errno); + } else { + flags |= O_NONBLOCK; + if(fcntl(peer->sd, F_SETFL, flags) < 0) + opal_output(0, "%s-%s usock_peer_connect: fcntl(F_SETFL) failed: %s (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + strerror(opal_socket_errno), + opal_socket_errno); + } + } + + return ORTE_SUCCESS; +} + + +/* + * Try connecting to a peer + */ +void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata) +{ + mca_oob_usock_conn_op_t *op = (mca_oob_usock_conn_op_t*)cbdata; + mca_oob_usock_peer_t *peer = op->peer; + int rc; + opal_socklen_t addrlen = 0; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s orte_usock_peer_try_connect: " + "attempting to connect to proc %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + + rc = usock_peer_create_socket(peer); + if (ORTE_SUCCESS != rc) { + /* FIXME: we cannot create a USOCK socket - report + * back to the component that this peer is + * unreachable so it can remove the peer + * from its list and report back to the base + * NOTE: this could be a reconnect attempt, + * so we also need to mark any queued messages + * and return them as "unreachable" + */ + opal_output(0, "%s CANNOT CREATE SOCKET", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + ORTE_FORCED_TERMINATE(1); + OBJ_RELEASE(op); + return; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s orte_usock_peer_try_connect: " + "attempting to connect to proc %s on socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), peer->sd); + + addrlen = sizeof(struct sockaddr_un); + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s orte_usock_peer_try_connect: " + "attempting to connect to proc %s - %d retries", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + peer->retries); + + retry_connect: + peer->retries++; + if (connect(peer->sd, (struct sockaddr *) &mca_oob_usock_component.address, addrlen) < 0) { + /* non-blocking so wait for completion */ + if (opal_socket_errno == EINPROGRESS || opal_socket_errno == EWOULDBLOCK) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s waiting for connect completion to %s - activating send event", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + /* just ensure the send_event is active */ + if (!peer->send_ev_active) { + opal_event_add(&peer->send_event, 0); + peer->send_ev_active = true; + } + OBJ_RELEASE(op); + return; + } + + /* Some kernels (Linux 2.6) will automatically software + abort a connection that was ECONNREFUSED on the last + attempt, without even trying to establish the + connection. Handle that case in a semi-rational + way by trying twice before giving up */ + if (ECONNABORTED == opal_socket_errno) { + if (peer->retries < mca_oob_usock_component.max_retries) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connection aborted by OS to %s - retrying", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + goto retry_connect; + } else { + /* We were unsuccessful in establishing this connection, and are + * not likely to suddenly become successful, + */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s orte_usock_peer_try_connect: " + "Connection across unix domain socket to local proc %s failed", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + peer->state = MCA_OOB_USOCK_FAILED; + CLOSE_THE_SOCKET(peer->sd); + /* let the USOCK component know that this module failed to make + * the connection so it can try other modules, and/or fail back + * to the OOB level so another component can try. This will activate + * an event in the component event base, and so it will fire async + * from us if we are in our own progress thread + */ + ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_failed_to_connect); + OBJ_RELEASE(op); + return; + } + } + } + + /* connection succeeded */ + peer->retries = 0; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s orte_usock_peer_try_connect: " + "Connection across to proc %s succeeded", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + /* setup our recv to catch the return ack call */ + if (!peer->recv_ev_active) { + opal_event_add(&peer->recv_event, 0); + peer->recv_ev_active = true; + } + + /* send our globally unique process identifier to the peer */ + if (ORTE_SUCCESS == (rc = usock_peer_send_connect_ack(peer))) { + peer->state = MCA_OOB_USOCK_CONNECT_ACK; + } else { + opal_output(0, + "%s orte_usock_peer_try_connect: " + "usock_peer_send_connect_ack to proc %s failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + opal_strerror(rc), rc); + ORTE_FORCED_TERMINATE(1); + } + + OBJ_RELEASE(op); +} + +static int usock_peer_send_connect_ack(mca_oob_usock_peer_t* peer) +{ + char *msg; + mca_oob_usock_hdr_t hdr; + int rc; + size_t sdsize; + char *cred; + size_t credsize; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s SEND CONNECT ACK", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + + /* send a handshake that includes our process identifier + * to ensure we are talking to another OMPI process + */ + hdr.origin = *ORTE_PROC_MY_NAME; + hdr.dst = peer->name; + hdr.type = MCA_OOB_USOCK_IDENT; + hdr.tag = 0; + + /* get our security credential*/ + if (OPAL_SUCCESS != (rc = opal_sec.get_my_credential(peer->auth_method, + ORTE_PROC_MY_NAME, &cred, &credsize))) { + ORTE_ERROR_LOG(rc); + return rc; + } + + /* set the number of bytes to be read beyond the header */ + hdr.nbytes = strlen(orte_version_string) + 1 + credsize; + + /* create a space for our message */ + sdsize = (sizeof(hdr) + strlen(orte_version_string) + 1 + credsize); + if (NULL == (msg = (char*)malloc(sdsize))) { + return ORTE_ERR_OUT_OF_RESOURCE; + } + memset(msg, 0, sdsize); + + /* load the message */ + memcpy(msg, &hdr, sizeof(hdr)); + memcpy(msg+sizeof(hdr), orte_version_string, strlen(orte_version_string)); + memcpy(msg+sizeof(hdr)+strlen(orte_version_string)+1, cred, credsize); + free(cred); + + if (ORTE_SUCCESS != usock_peer_send_blocking(peer, peer->sd, msg, sdsize)) { + ORTE_ERROR_LOG(ORTE_ERR_UNREACH); + free(msg); + return ORTE_ERR_UNREACH; + } + free(msg); + return ORTE_SUCCESS; +} + +/* + * Initialize events to be used by the peer instance for USOCK select/poll callbacks. + */ +static void usock_peer_event_init(mca_oob_usock_peer_t* peer) +{ + if (peer->sd >= 0) { + opal_event_set(mca_oob_usock_module.ev_base, + &peer->recv_event, + peer->sd, + OPAL_EV_READ|OPAL_EV_PERSIST, + mca_oob_usock_recv_handler, + peer); + opal_event_set_priority(&peer->recv_event, ORTE_MSG_PRI); + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + opal_event_set(mca_oob_usock_module.ev_base, + &peer->send_event, + peer->sd, + OPAL_EV_WRITE|OPAL_EV_PERSIST, + mca_oob_usock_send_handler, + peer); + opal_event_set_priority(&peer->send_event, ORTE_MSG_PRI); + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + } +} + +/* + * Check the status of the connection. If the connection failed, will retry + * later. Otherwise, send this processes identifier to the peer on the + * newly connected socket. + */ +void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t *peer) +{ + int so_error = 0, rc; + opal_socklen_t so_length = sizeof(so_error); + orte_oob_base_peer_t *bpr; + uint64_t ui64; + mca_oob_usock_peer_t *pr; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:complete_connect called for peer %s on socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), peer->sd); + + /* check connect completion status */ + if (getsockopt(peer->sd, SOL_SOCKET, SO_ERROR, (char *)&so_error, &so_length) < 0) { + opal_output(0, "%s usock_peer_complete_connect: getsockopt() to %s failed: %s (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + strerror(opal_socket_errno), + opal_socket_errno); + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + return; + } + + if (so_error == EINPROGRESS) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:send:handler still in progress", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + return; + } else if (so_error == ECONNREFUSED || so_error == ETIMEDOUT) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s-%s usock_peer_complete_connect: connection failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + strerror(so_error), + so_error); + mca_oob_usock_peer_close(peer); + return; + } else if (so_error != 0) { + /* No need to worry about the return code here - we return regardless + at this point, and if an error did occur a message has already been + printed for the user */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s-%s usock_peer_complete_connect: " + "connection failed with error %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), so_error); + mca_oob_usock_peer_close(peer); + return; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock_peer_complete_connect: " + "sending ack to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + + if (usock_peer_send_connect_ack(peer) == ORTE_SUCCESS) { + peer->state = MCA_OOB_USOCK_CONNECT_ACK; + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock_peer_complete_connect: " + "setting read event on connection to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + + if (!peer->recv_ev_active) { + opal_event_add(&peer->recv_event, 0); + peer->recv_ev_active = true; + } + } else { + opal_output(0, "%s usock_peer_complete_connect: unable to send connect ack to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + } + + /* make sure the OOB knows that we are handling this peer - we + * are in the same event base as the OOB base, so we can + * directly access its storage + */ + memcpy(&ui64, (char*)&(peer->name), sizeof(uint64_t)); + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&orte_oob_base.peers, + ui64, (void**)&bpr) || NULL == bpr) { + bpr = OBJ_NEW(orte_oob_base_peer_t); + } + opal_bitmap_set_bit(&bpr->addressable, mca_oob_usock_component.super.idx); + bpr->component = &mca_oob_usock_component.super; + if (OPAL_SUCCESS != (rc = opal_hash_table_set_value_uint64(&orte_oob_base.peers, + ui64, bpr))) { + ORTE_ERROR_LOG(rc); + } + /* record it locally too */ + if (OPAL_SUCCESS != opal_hash_table_get_value_uint64(&mca_oob_usock_module.peers, + ui64, (void**)&pr) || NULL == pr) { + pr = OBJ_NEW(mca_oob_usock_peer_t); + pr->name = peer->name; + opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, ui64, pr); + } +} + +/* + * A blocking send on a non-blocking socket. Used to send the small amount of connection + * information that identifies the peers endpoint. + */ +static int usock_peer_send_blocking(mca_oob_usock_peer_t* peer, + int sd, void* data, size_t size) +{ + unsigned char* ptr = (unsigned char*)data; + size_t cnt = 0; + int retval; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s send blocking of %"PRIsize_t" bytes to socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + size, sd); + + while (cnt < size) { + retval = send(sd, (char*)ptr+cnt, size-cnt, 0); + if (retval < 0) { + if (opal_socket_errno != EINTR && opal_socket_errno != EAGAIN && opal_socket_errno != EWOULDBLOCK) { + opal_output(0, "%s usock_peer_send_blocking: send() to socket %d failed: %s (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd, + strerror(opal_socket_errno), + opal_socket_errno); + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + return ORTE_ERR_UNREACH; + } + continue; + } + cnt += retval; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s blocking send complete to socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), sd); + + return ORTE_SUCCESS; +} + +/* + * Receive the peers globally unique process identification from a newly + * connected socket and verify the expected response. If so, move the + * socket to a connected state. + */ +int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* pr, int sd, + mca_oob_usock_hdr_t *dhdr) +{ + char *msg; + char *version; + int rc, cmpval; + char *cred; + size_t credsize; + mca_oob_usock_peer_t *peer; + mca_oob_usock_hdr_t hdr; + uint64_t *ui64; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s RECV CONNECT ACK FROM %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == pr) ? "UNKNOWN" : ORTE_NAME_PRINT(&pr->name), sd); + + peer = pr; + /* ensure all is zero'd */ + memset(&hdr, 0, sizeof(mca_oob_usock_hdr_t)); + + if (usock_peer_recv_blocking(peer, sd, &hdr, sizeof(mca_oob_usock_hdr_t))) { + if (NULL != peer) { + /* If the peer state is CONNECT_ACK, then we were waiting for + * the connection to be ack'd + */ + if (peer->state != MCA_OOB_USOCK_CONNECT_ACK) { + /* handshake broke down - abort this connection */ + opal_output(0, "%s RECV CONNECT BAD HANDSHAKE FROM %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), sd); + mca_oob_usock_peer_close(peer); + return ORTE_ERR_UNREACH; + } + } + } else { + /* unable to complete the recv */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s unable to complete recv of connect-ack from %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name), sd); + return ORTE_ERR_UNREACH; + } + /* if the requestor wanted the header returned, then do so now */ + if (NULL != dhdr) { + *dhdr = hdr; + } + + if (MCA_OOB_USOCK_PROBE == hdr.type) { + /* send a header back */ + hdr.type = MCA_OOB_USOCK_PROBE; + hdr.dst = hdr.origin; + hdr.origin = *ORTE_PROC_MY_NAME; + usock_peer_send_blocking(peer, sd, &hdr, sizeof(mca_oob_usock_hdr_t)); + CLOSE_THE_SOCKET(sd); + return ORTE_SUCCESS; + } + + if (hdr.type != MCA_OOB_USOCK_IDENT) { + opal_output(0, "usock_peer_recv_connect_ack: invalid header type: %d\n", hdr.type); + if (NULL != peer) { + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + } else { + CLOSE_THE_SOCKET(sd); + } + return ORTE_ERR_UNREACH; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect-ack recvd from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&peer->name)); + + /* if we don't already have it, get the peer */ + if (NULL == peer) { + peer = mca_oob_usock_peer_lookup(&hdr.origin); + if (NULL == peer) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s mca_oob_usock_recv_connect: connection from new peer", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + peer = OBJ_NEW(mca_oob_usock_peer_t); + peer->name = hdr.origin; + peer->state = MCA_OOB_USOCK_ACCEPTING; + peer->sd = sd; + ui64 = (uint64_t*)(&peer->name); + if (OPAL_SUCCESS != opal_hash_table_set_value_uint64(&mca_oob_usock_module.peers, (*ui64), peer)) { + OBJ_RELEASE(peer); + CLOSE_THE_SOCKET(sd); + return ORTE_ERR_UNREACH; + } + } else { + /* check for a race condition - if I was in the process of + * creating a connection to the peer, or have already established + * such a connection, then we need to reject this connection. We will + * let the higher ranked process retry - if I'm the lower ranked + * process, I'll simply defer until I receive the request + */ + if (MCA_OOB_USOCK_CONNECTED == peer->state || + MCA_OOB_USOCK_CONNECTING == peer->state || + MCA_OOB_USOCK_CONNECT_ACK == peer->state) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s SIMUL CONNECTION WITH %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&hdr.origin)); + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + if (0 < peer->sd) { + CLOSE_THE_SOCKET(peer->sd); + peer->sd = -1; + } + CLOSE_THE_SOCKET(sd); + peer->retries = 0; + cmpval = orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &hdr.origin, ORTE_PROC_MY_NAME); + if (OPAL_VALUE1_GREATER == cmpval) { + /* force the other end to retry the connection */ + peer->state = MCA_OOB_USOCK_UNCONNECTED; + return ORTE_ERR_UNREACH; + } else { + /* retry the connection */ + peer->state = MCA_OOB_USOCK_CONNECTING; + ORTE_ACTIVATE_USOCK_CONN_STATE(peer, mca_oob_usock_peer_try_connect); + return ORTE_ERR_UNREACH; + } + } + } + } else { + /* compare the peers name to the expected value */ + if (OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL, &peer->name, &hdr.origin)) { + opal_output(0, "%s usock_peer_recv_connect_ack: " + "received unexpected process identifier %s from %s\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(hdr.origin)), + ORTE_NAME_PRINT(&(peer->name))); + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + return ORTE_ERR_UNREACH; + } + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect-ack header from %s is okay", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + + /* get the authentication and version payload */ + if (NULL == (msg = (char*)malloc(hdr.nbytes))) { + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + return ORTE_ERR_OUT_OF_RESOURCE; + } + if (!usock_peer_recv_blocking(peer, sd, msg, hdr.nbytes)) { + /* unable to complete the recv */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s unable to complete recv of connect-ack from %s ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), peer->sd); + free(msg); + return ORTE_ERR_UNREACH; + } + + /* check that this is from a matching version */ + version = (char*)(msg); + if (0 != strcmp(version, orte_version_string)) { + opal_output(0, "%s usock_peer_recv_connect_ack: " + "received different version from %s: %s instead of %s\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + version, orte_version_string); + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + free(msg); + return ORTE_ERR_UNREACH; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect-ack version from %s matches ours", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + + /* check security token */ + cred = (char*)(msg + strlen(version) + 1); + credsize = hdr.nbytes - strlen(version) - 1; + if (OPAL_SUCCESS != (rc = opal_sec.authenticate(cred, credsize, &peer->auth_method))) { + ORTE_ERROR_LOG(rc); + } + free(msg); + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect-ack %s authenticated", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + + /* if the requestor wanted the header returned, then they + * will complete their processing + */ + if (NULL != dhdr) { + return ORTE_SUCCESS; + } + + /* set the peer into the component and OOB-level peer tables to indicate + * that we know this peer and we will be handling him + */ + ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_set_module); + + /* connected */ + usock_peer_connected(peer); + if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { + mca_oob_usock_peer_dump(peer, "connected"); + } + return ORTE_SUCCESS; +} + +/* + * Setup peer state to reflect that connection has been established, + * and start any pending sends. + */ +static void usock_peer_connected(mca_oob_usock_peer_t* peer) +{ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s-%s usock_peer_connected on socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), peer->sd); + + if (peer->timer_ev_active) { + opal_event_del(&peer->timer_event); + peer->timer_ev_active = false; + } + peer->state = MCA_OOB_USOCK_CONNECTED; + + /* initiate send of first message on queue */ + if (NULL == peer->send_msg) { + peer->send_msg = (mca_oob_usock_send_t*) + opal_list_remove_first(&peer->send_queue); + } + if (NULL != peer->send_msg && !peer->send_ev_active) { + opal_event_add(&peer->send_event, 0); + peer->send_ev_active = true; + } +} + +/* + * Remove any event registrations associated with the socket + * and update the peer state to reflect the connection has + * been closed. + */ +void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer) +{ + mca_oob_usock_send_t *snd; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock_peer_close for %s sd %d state %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + peer->sd, mca_oob_usock_state_print(peer->state)); + + peer->state = MCA_OOB_USOCK_CLOSED; + + /* release the socket */ + close(peer->sd); + + /* inform the component-level that we have lost a connection so + * it can decide what to do about it. + */ + ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_lost_connection); + + if (orte_orteds_term_ordered || orte_finalizing || orte_abnormal_term_ordered) { + /* nothing more to do */ + return; + } + + /* FIXME: push any queued messages back onto the OOB for retry - note that + * this must be done after the prior call to ensure that the component + * processes the "lost connection" notice before the OOB begins to + * handle these recycled messages. This prevents us from unintentionally + * attempting to send the message again across the now-failed interface + */ + if (NULL != peer->send_msg) { + } + while (NULL != (snd = (mca_oob_usock_send_t*)opal_list_remove_first(&peer->send_queue))) { + } +} + +/* + * A blocking recv on a non-blocking socket. Used to receive the small amount of connection + * information that identifies the peers endpoint. + */ +static bool usock_peer_recv_blocking(mca_oob_usock_peer_t* peer, + int sd, void* data, size_t size) +{ + unsigned char* ptr = (unsigned char*)data; + size_t cnt = 0; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s waiting for connect ack from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); + + while (cnt < size) { + int retval = recv(sd, (char *)ptr+cnt, size-cnt, 0); + + /* remote closed connection */ + if (retval == 0) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s-%s usock_peer_recv_blocking: " + "peer closed connection: peer state %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)), + (NULL == peer) ? 0 : peer->state); + mca_oob_usock_peer_close(peer); + return false; + } + + /* socket is non-blocking so handle errors */ + if (retval < 0) { + if (opal_socket_errno != EINTR && + opal_socket_errno != EAGAIN && + opal_socket_errno != EWOULDBLOCK) { + if (peer->state == MCA_OOB_USOCK_CONNECT_ACK) { + /* If we overflow the listen backlog, it's + possible that even though we finished the three + way handshake, the remote host was unable to + transition the connection from half connected + (received the initial SYN) to fully connected + (in the listen backlog). We likely won't see + the failure until we try to receive, due to + timing and the like. The first thing we'll get + in that case is a RST packet, which receive + will turn into a connection reset by peer + errno. In that case, leave the socket in + CONNECT_ACK and propogate the error up to + recv_connect_ack, who will try to establish the + connection again */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect ack received error %s from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + strerror(opal_socket_errno), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); + return false; + } else { + opal_output(0, + "%s usock_peer_recv_blocking: " + "recv() failed for %s: %s (%d)\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name)), + strerror(opal_socket_errno), + opal_socket_errno); + if (NULL != peer) { + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + } else { + CLOSE_THE_SOCKET(sd); + } + return false; + } + } + continue; + } + cnt += retval; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s connect ack received from %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == peer) ? "UNKNOWN" : ORTE_NAME_PRINT(&(peer->name))); + return true; +} + +/* + * Routine for debugging to print the connection state and socket options + */ +void mca_oob_usock_peer_dump(mca_oob_usock_peer_t* peer, const char* msg) +{ + char buff[255]; + int nodelay,flags; + + if ((flags = fcntl(peer->sd, F_GETFL, 0)) < 0) { + opal_output(0, "usock_peer_dump: fcntl(F_GETFL) failed: %s (%d)\n", + strerror(opal_socket_errno), + opal_socket_errno); + } +#if defined(USOCK_NODELAY) + optlen = sizeof(nodelay); + if (getsockopt(peer->sd, IPPROTO_USOCK, USOCK_NODELAY, (char *)&nodelay, &optlen) < 0) { + opal_output(0, "usock_peer_dump: USOCK_NODELAY option: %s (%d)\n", + strerror(opal_socket_errno), + opal_socket_errno); + } +#else + nodelay = 0; +#endif + + snprintf(buff, sizeof(buff), "%s-%s %s: nodelay %d flags %08x\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + msg, nodelay, flags); + opal_output(0, "%s", buff); +} + +/* + * Accept incoming connection - if not already connected + */ + +bool mca_oob_usock_peer_accept(mca_oob_usock_peer_t* peer) +{ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:peer_accept called for peer %s in state %s on socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), + mca_oob_usock_state_print(peer->state), peer->sd); + + if (peer->state != MCA_OOB_USOCK_CONNECTED) { + + usock_peer_event_init(peer); + + if (usock_peer_send_connect_ack(peer) != ORTE_SUCCESS) { + opal_output(0, "%s-%s usock_peer_accept: " + "usock_peer_send_connect_ack failed\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + peer->state = MCA_OOB_USOCK_FAILED; + mca_oob_usock_peer_close(peer); + return false; + } + + /* set the peer into the component and OOB-level peer tables to indicate + * that we know this peer and we will be handling him + */ + ORTE_ACTIVATE_USOCK_CMP_OP(peer, mca_oob_usock_component_set_module); + + usock_peer_connected(peer); + if (!peer->recv_ev_active) { + opal_event_add(&peer->recv_event, 0); + peer->recv_ev_active = true; + } + /* if a message is waiting to be sent, ensure the send event is active */ + if (NULL != peer->send_msg && !peer->send_ev_active) { + opal_event_add(&peer->send_event, 0); + peer->send_ev_active = true; + } + if (OOB_USOCK_DEBUG_CONNECT <= opal_output_get_verbosity(orte_oob_base_framework.framework_output)) { + mca_oob_usock_peer_dump(peer, "accepted"); + } + return true; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:peer_accept ignored for peer %s in state %s on socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), + mca_oob_usock_state_print(peer->state), peer->sd); + return false; +} diff --git a/orte/mca/oob/usock/oob_usock_connection.h b/orte/mca/oob/usock/oob_usock_connection.h new file mode 100644 index 00000000000..fe98f6e09c9 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_connection.h @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _MCA_OOB_USOCK_CONNECTION_H_ +#define _MCA_OOB_USOCK_CONNECTION_H_ + +#include "orte_config.h" + +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#ifdef HAVE_SYS_SOCKET_H +#include +#endif + +#include "oob_usock.h" +#include "oob_usock_peer.h" + +/* State machine for connection operations */ +typedef struct { + opal_object_t super; + mca_oob_usock_peer_t *peer; + opal_event_t ev; +} mca_oob_usock_conn_op_t; +OBJ_CLASS_DECLARATION(mca_oob_usock_conn_op_t); + +#define CLOSE_THE_SOCKET(socket) \ + do { \ + shutdown(socket, 2); \ + close(socket); \ + } while(0) + +#define ORTE_ACTIVATE_USOCK_CONN_STATE(p, cbfunc) \ + do { \ + mca_oob_usock_conn_op_t *cop; \ + opal_output_verbose(5, orte_oob_base_framework.framework_output, \ + "%s:[%s:%d] connect to %s", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + __FILE__, __LINE__, \ + ORTE_NAME_PRINT((&(p)->name))); \ + cop = OBJ_NEW(mca_oob_usock_conn_op_t); \ + cop->peer = (p); \ + opal_event_set(mca_oob_usock_module.ev_base, &cop->ev, -1, \ + OPAL_EV_WRITE, (cbfunc), cop); \ + opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \ + opal_event_active(&cop->ev, OPAL_EV_WRITE, 1); \ + } while(0); + +#define ORTE_ACTIVATE_USOCK_ACCEPT_STATE(s, a, cbfunc) \ + do { \ + mca_oob_usock_conn_op_t *cop; \ + cop = OBJ_NEW(mca_oob_usock_conn_op_t); \ + opal_event_set(mca_oob_usock_module.ev_base, &cop->ev, s, \ + OPAL_EV_READ, (cbfunc), cop); \ + opal_event_set_priority(&cop->ev, ORTE_MSG_PRI); \ + opal_event_add(&cop->ev, 0); \ + } while(0); + +#define ORTE_RETRY_USOCK_CONN_STATE(p, cbfunc, tv) \ + do { \ + mca_oob_usock_conn_op_t *cop; \ + opal_output_verbose(5, orte_oob_base_framework.framework_output, \ + "%s:[%s:%d] retry connect to %s", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + __FILE__, __LINE__, \ + ORTE_NAME_PRINT((&(p)->name))); \ + cop = OBJ_NEW(mca_oob_usock_conn_op_t); \ + cop->peer = (p); \ + opal_event_evtimer_set(mca_oob_usock_module.ev_base, \ + &cop->ev, \ + (cbfunc), cop); \ + opal_event_evtimer_add(&cop->ev, (tv)); \ + } while(0); + +ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_try_connect(int fd, short args, void *cbdata); +ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_dump(mca_oob_usock_peer_t* peer, const char* msg); +ORTE_MODULE_DECLSPEC bool mca_oob_usock_peer_accept(mca_oob_usock_peer_t* peer); +ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_complete_connect(mca_oob_usock_peer_t* peer); +ORTE_MODULE_DECLSPEC int mca_oob_usock_peer_recv_connect_ack(mca_oob_usock_peer_t* peer, + int sd, mca_oob_usock_hdr_t *hdr); +ORTE_MODULE_DECLSPEC void mca_oob_usock_peer_close(mca_oob_usock_peer_t *peer); + +#endif /* _MCA_OOB_USOCK_CONNECTION_H_ */ diff --git a/orte/mca/oob/usock/oob_usock_hdr.h b/orte/mca/oob/usock/oob_usock_hdr.h new file mode 100644 index 00000000000..765600ff819 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_hdr.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _MCA_OOB_USOCK_HDR_H_ +#define _MCA_OOB_USOCK_HDR_H_ + +#include "orte_config.h" + +/* define several internal-only message + * types this component uses for its own + * handshake operations, plus one indicating + * the message came from an external (to + * this component) source + */ +typedef enum { + MCA_OOB_USOCK_IDENT, + MCA_OOB_USOCK_PROBE, + MCA_OOB_USOCK_PING, + MCA_OOB_USOCK_USER +} mca_oob_usock_msg_type_t; + +/* header for usock msgs */ +typedef struct { + /* the original sender */ + orte_process_name_t origin; + /* the intended final recipient */ + orte_process_name_t dst; + /* type of message */ + mca_oob_usock_msg_type_t type; + /* the rml tag where this message is headed */ + orte_rml_tag_t tag; + /* number of bytes in message */ + uint32_t nbytes; +} mca_oob_usock_hdr_t; + +#endif /* _MCA_OOB_USOCK_HDR_H_ */ diff --git a/orte/mca/oob/usock/oob_usock_peer.h b/orte/mca/oob/usock/oob_usock_peer.h new file mode 100644 index 00000000000..cc715d4fdc8 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_peer.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _MCA_OOB_USOCK_PEER_H_ +#define _MCA_OOB_USOCK_PEER_H_ + +#include "orte_config.h" + +#include "oob_usock.h" +#include "oob_usock_sendrecv.h" + +/* object for tracking peers */ +typedef struct { + opal_list_item_t super; + /* although not required, there is enough debug + * value that retaining the name makes sense + */ + orte_process_name_t name; + char *auth_method; // how the peer authenticated themselves to use + int sd; + int retries; // number of times we have tried to connect to this address + mca_oob_usock_state_t state; + opal_event_t op_event; // used for connecting and operations other than read/write + opal_event_t send_event; /**< registration with event thread for send events */ + bool send_ev_active; + opal_event_t recv_event; /**< registration with event thread for recv events */ + bool recv_ev_active; + opal_event_t timer_event; /**< timer for retrying connection failures */ + bool timer_ev_active; + opal_list_t send_queue; /**< list of messages to send */ + mca_oob_usock_send_t *send_msg; /**< current send in progress */ + mca_oob_usock_recv_t *recv_msg; /**< current recv in progress */ +} mca_oob_usock_peer_t; +OBJ_CLASS_DECLARATION(mca_oob_usock_peer_t); + +typedef struct { + opal_object_t super; + opal_event_t ev; + mca_oob_usock_peer_t *peer; +} mca_oob_usock_peer_op_t; +OBJ_CLASS_DECLARATION(mca_oob_usock_peer_op_t); + +#define ORTE_ACTIVATE_USOCK_PEER_OP(p, cbfunc) \ + do { \ + mca_oob_usock_peer_op_t *op; \ + op = OBJ_NEW(mca_oob_usock_peer_op_t); \ + op->peer = (p); \ + opal_event_set(mca_usock_component.ev_base, &op->ev, -1, \ + OPAL_EV_WRITE, (cbfunc), op); \ + opal_event_set_priority(&op->ev, ORTE_MSG_PRI); \ + opal_event_active(&op->ev, OPAL_EV_WRITE, 1); \ + } while(0); + +#define ORTE_ACTIVATE_USOCK_CMP_OP(p, cbfunc) \ + do { \ + mca_oob_usock_peer_op_t *pop; \ + pop = OBJ_NEW(mca_oob_usock_peer_op_t); \ + pop->peer = (p); \ + opal_event_set(orte_event_base, &pop->ev, -1, \ + OPAL_EV_WRITE, (cbfunc), pop); \ + opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \ + opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \ + } while(0); + + +#endif /* _MCA_OOB_USOCK_PEER_H_ */ diff --git a/orte/mca/oob/usock/oob_usock_ping.h b/orte/mca/oob/usock/oob_usock_ping.h new file mode 100644 index 00000000000..67badb8f05e --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_ping.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2011 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _MCA_OOB_USOCK_PING_H_ +#define _MCA_OOB_USOCK_PING_H_ + +#include "orte_config.h" + +#include "opal/mca/event/event.h" + +#include "oob_usock.h" +#include "oob_usock_sendrecv.h" + +typedef struct { + opal_object_t super; + opal_event_t ev; + orte_process_name_t peer; +} mca_oob_usock_ping_t; +OBJ_CLASS_DECLARATION(mca_oob_usock_ping_t); + +#define ORTE_ACTIVATE_USOCK_PING(p, cbfunc) \ + do { \ + mca_oob_usock_ping_t *pop; \ + pop = OBJ_NEW(mca_oob_usock_ping_t); \ + pop->peer.jobid = (p)->jobid; \ + pop->peer.vpid = (p)->vpid; \ + opal_event_set(mca_oob_usock_module.ev_base, &pop->ev, -1, \ + OPAL_EV_WRITE, (cbfunc), pop); \ + opal_event_set_priority(&pop->ev, ORTE_MSG_PRI); \ + opal_event_active(&pop->ev, OPAL_EV_WRITE, 1); \ + } while(0); + +#endif /* _MCA_OOB_USOCK_PING_H_ */ diff --git a/orte/mca/oob/usock/oob_usock_sendrecv.c b/orte/mca/oob/usock/oob_usock_sendrecv.c new file mode 100644 index 00000000000..34e285a4498 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_sendrecv.c @@ -0,0 +1,608 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + * In windows, many of the socket functions return an EWOULDBLOCK + * instead of \ things like EAGAIN, EINPROGRESS, etc. It has been + * verified that this will \ not conflict with other error codes that + * are returned by these functions \ under UNIX/Linux environments + */ + +#include "orte_config.h" + +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#ifdef HAVE_SYS_UIO_H +#include +#endif +#ifdef HAVE_NET_UIO_H +#include +#endif +#ifdef HAVE_SYS_TYPES_H +#include +#endif +#include "opal/opal_socket_errno.h" +#ifdef HAVE_NETINET_IN_H +#include +#endif +#ifdef HAVE_ARPA_INET_H +#include +#endif +#ifdef HAVE_NETINET_TCP_H +#include +#endif + +#include "opal_stdint.h" +#include "opal/types.h" +#include "opal/mca/backtrace/backtrace.h" +#include "opal/util/output.h" +#include "opal/util/net.h" +#include "opal/util/error.h" +#include "opal/class/opal_hash_table.h" +#include "opal/mca/event/event.h" + +#include "orte/util/name_fns.h" +#include "orte/runtime/orte_globals.h" +#include "orte/mca/errmgr/errmgr.h" +#include "orte/mca/ess/ess.h" +#include "orte/mca/state/state.h" +#include "orte/runtime/orte_wait.h" + +#include "oob_usock.h" +#include "orte/mca/oob/usock/oob_usock_component.h" +#include "orte/mca/oob/usock/oob_usock_peer.h" +#include "orte/mca/oob/usock/oob_usock_connection.h" + +static int send_bytes(mca_oob_usock_peer_t* peer) +{ + mca_oob_usock_send_t* msg = peer->send_msg; + int rc; + + while (0 < msg->sdbytes) { + rc = write(peer->sd, msg->sdptr, msg->sdbytes); + if (rc < 0) { + if (opal_socket_errno == EINTR) { + continue; + } else if (opal_socket_errno == EAGAIN) { + /* tell the caller to keep this message on active, + * but let the event lib cycle so other messages + * can progress while this socket is busy + */ + return ORTE_ERR_RESOURCE_BUSY; + } else if (opal_socket_errno == EWOULDBLOCK) { + /* tell the caller to keep this message on active, + * but let the event lib cycle so other messages + * can progress while this socket is busy + */ + return ORTE_ERR_WOULD_BLOCK; + } + /* we hit an error and cannot progress this message */ + opal_output(0, "%s->%s mca_oob_usock_msg_send_bytes: write failed: %s (%d) [sd = %d]", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + strerror(opal_socket_errno), + opal_socket_errno, + peer->sd); + return ORTE_ERR_COMM_FAILURE; + } + /* update location */ + msg->sdbytes -= rc; + msg->sdptr += rc; + } + /* we sent the full data block */ + return ORTE_SUCCESS; +} + +/* + * A file descriptor is available/ready for send. Check the state + * of the socket and take the appropriate action. + */ +void mca_oob_usock_send_handler(int sd, short flags, void *cbdata) +{ + mca_oob_usock_peer_t* peer = (mca_oob_usock_peer_t*)cbdata; + mca_oob_usock_send_t* msg = peer->send_msg; + int rc; + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:send_handler called to send to peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + + switch (peer->state) { + case MCA_OOB_USOCK_CONNECTING: + case MCA_OOB_USOCK_CLOSED: + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:send_handler %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + mca_oob_usock_state_print(peer->state)); + mca_oob_usock_peer_complete_connect(peer); + /* de-activate the send event until the connection + * handshake completes + */ + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + break; + case MCA_OOB_USOCK_CONNECTED: + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s usock:send_handler SENDING TO %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + (NULL == peer->send_msg) ? "NULL" : ORTE_NAME_PRINT(&peer->name)); + if (NULL != msg) { + /* if the header hasn't been completely sent, send it */ + if (!msg->hdr_sent) { + if (ORTE_SUCCESS == (rc = send_bytes(peer))) { + /* header is completely sent */ + msg->hdr_sent = true; + /* setup to send the data */ + if (NULL == msg->msg) { + /* this was a zero-byte msg - nothing more to do */ + OBJ_RELEASE(msg); + peer->send_msg = NULL; + goto next; + } else if (NULL != msg->msg->buffer) { + /* send the buffer data as a single block */ + msg->sdptr = msg->msg->buffer->base_ptr; + msg->sdbytes = msg->msg->buffer->bytes_used; + } else if (NULL != msg->msg->iov) { + /* start with the first iovec */ + msg->sdptr = msg->msg->iov[0].iov_base; + msg->sdbytes = msg->msg->iov[0].iov_len; + msg->iovnum = 0; + } else { + msg->sdptr = msg->msg->data; + msg->sdbytes = msg->msg->count; + } + /* fall thru and let the send progress */ + } else if (ORTE_ERR_RESOURCE_BUSY == rc || + ORTE_ERR_WOULD_BLOCK == rc) { + /* exit this event and let the event lib progress */ + return; + } else { + // report the error + opal_output(0, "%s-%s mca_oob_usock_peer_send_handler: unable to send header", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + msg->msg->status = rc; + ORTE_RML_SEND_COMPLETE(msg->msg); + OBJ_RELEASE(msg); + peer->send_msg = NULL; + goto next; + } + } + /* progress the data transmission */ + if (msg->hdr_sent) { + if (ORTE_SUCCESS == (rc = send_bytes(peer))) { + /* this block is complete */ + if (NULL != msg->msg->buffer) { + /* we are done - notify the RML */ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + msg->hdr.nbytes, peer->sd); + msg->msg->status = ORTE_SUCCESS; + ORTE_RML_SEND_COMPLETE(msg->msg); + OBJ_RELEASE(msg); + peer->send_msg = NULL; + } else if (NULL != msg->msg->data) { + /* this was a relay message - nothing more to do */ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + msg->hdr.nbytes, peer->sd); + OBJ_RELEASE(msg); + peer->send_msg = NULL; + } else { + /* rotate to the next iovec */ + msg->iovnum++; + if (msg->iovnum < msg->msg->count) { + msg->sdptr = msg->msg->iov[msg->iovnum].iov_base; + msg->sdbytes = msg->msg->iov[msg->iovnum].iov_len; + /* exit this event to give the event lib + * a chance to progress any other pending + * actions + */ + return; + } else { + /* this message is complete - notify the RML */ + opal_output_verbose(2, orte_oob_base_framework.framework_output, + "%s MESSAGE SEND COMPLETE TO %s OF %d BYTES ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + msg->hdr.nbytes, peer->sd); + msg->msg->status = ORTE_SUCCESS; + ORTE_RML_SEND_COMPLETE(msg->msg); + OBJ_RELEASE(msg); + peer->send_msg = NULL; + } + } + /* fall thru to queue the next message */ + } else if (ORTE_ERR_RESOURCE_BUSY == rc || + ORTE_ERR_WOULD_BLOCK == rc) { + /* exit this event and let the event lib progress */ + return; + } else { + // report the error + opal_output(0, "%s-%s mca_oob_usock_peer_send_handler: unable to send message ON SOCKET %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), peer->sd); + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + msg->msg->status = rc; + ORTE_RML_SEND_COMPLETE(msg->msg); + OBJ_RELEASE(msg); + peer->send_msg = NULL; + ORTE_FORCED_TERMINATE(1); + return; + } + } + + next: + /* if current message completed - progress any pending sends by + * moving the next in the queue into the "on-deck" position. Note + * that this doesn't mean we send the message right now - we will + * wait for another send_event to fire before doing so. This gives + * us a chance to service any pending recvs. + */ + peer->send_msg = (mca_oob_usock_send_t*) + opal_list_remove_first(&peer->send_queue); + } + /* if nothing else to do unregister for send event notifications */ + if (NULL == peer->send_msg && peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + break; + default: + opal_output(0, "%s-%s mca_oob_usock_peer_send_handler: invalid connection state (%d) on socket %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + peer->state, peer->sd); + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + break; + } +} + +static int read_bytes(mca_oob_usock_peer_t* peer) +{ + int rc; + + /* read until all bytes recvd or error */ + while (0 < peer->recv_msg->rdbytes) { + rc = read(peer->sd, peer->recv_msg->rdptr, peer->recv_msg->rdbytes); + if (rc < 0) { + if(opal_socket_errno == EINTR) { + continue; + } else if (opal_socket_errno == EAGAIN) { + /* tell the caller to keep this message on active, + * but let the event lib cycle so other messages + * can progress while this socket is busy + */ + return ORTE_ERR_RESOURCE_BUSY; + } else if (opal_socket_errno == EWOULDBLOCK) { + /* tell the caller to keep this message on active, + * but let the event lib cycle so other messages + * can progress while this socket is busy + */ + return ORTE_ERR_WOULD_BLOCK; + } + /* we hit an error and cannot progress this message - report + * the error back to the RML and let the caller know + * to abort this message + */ + opal_output_verbose(OOB_USOCK_DEBUG_FAIL, orte_oob_base_framework.framework_output, + "%s-%s mca_oob_usock_msg_recv: readv failed: %s (%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + strerror(opal_socket_errno), + opal_socket_errno); + // mca_oob_usock_peer_close(peer); + // if (NULL != mca_oob_usock.oob_exception_callback) { + // mca_oob_usock.oob_exception_callback(&peer->name, ORTE_RML_PEER_DISCONNECTED); + //} + return ORTE_ERR_COMM_FAILURE; + } else if (rc == 0) { + /* the remote peer closed the connection - report that condition + * and let the caller know + */ + opal_output_verbose(OOB_USOCK_DEBUG_FAIL, orte_oob_base_framework.framework_output, + "%s-%s mca_oob_usock_msg_recv: peer closed connection", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + /* stop all events */ + if (peer->recv_ev_active) { + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + } + if (peer->timer_ev_active) { + opal_event_del(&peer->timer_event); + peer->timer_ev_active = false; + } + if (peer->send_ev_active) { + opal_event_del(&peer->send_event); + peer->send_ev_active = false; + } + if (NULL != peer->recv_msg) { + OBJ_RELEASE(peer->recv_msg); + peer->recv_msg = NULL; + } + mca_oob_usock_peer_close(peer); + //if (NULL != mca_oob_usock.oob_exception_callback) { + // mca_oob_usock.oob_exception_callback(&peer->peer_name, ORTE_RML_PEER_DISCONNECTED); + //} + return ORTE_ERR_WOULD_BLOCK; + } + /* we were able to read something, so adjust counters and location */ + peer->recv_msg->rdbytes -= rc; + peer->recv_msg->rdptr += rc; + } + + /* we read the full data block */ + return ORTE_SUCCESS; +} + +/* + * Dispatch to the appropriate action routine based on the state + * of the connection with the peer. + */ + +void mca_oob_usock_recv_handler(int sd, short flags, void *cbdata) +{ + mca_oob_usock_peer_t* peer = (mca_oob_usock_peer_t*)cbdata; + int rc; + orte_rml_send_t *snd; + + if (orte_abnormal_term_ordered) { + return; + } + + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:recv:handler called for peer %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + + switch (peer->state) { + case MCA_OOB_USOCK_CONNECT_ACK: + if (ORTE_SUCCESS == (rc = mca_oob_usock_peer_recv_connect_ack(peer, peer->sd, NULL))) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:recv:handler starting send/recv events", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + /* we connected! Start the send/recv events */ + if (!peer->recv_ev_active) { + opal_event_add(&peer->recv_event, 0); + peer->recv_ev_active = true; + } + if (peer->timer_ev_active) { + opal_event_del(&peer->timer_event); + peer->timer_ev_active = false; + } + /* if there is a message waiting to be sent, queue it */ + if (NULL == peer->send_msg) { + peer->send_msg = (mca_oob_usock_send_t*)opal_list_remove_first(&peer->send_queue); + } + if (NULL != peer->send_msg && !peer->send_ev_active) { + opal_event_add(&peer->send_event, 0); + peer->send_ev_active = true; + } + /* update our state */ + peer->state = MCA_OOB_USOCK_CONNECTED; + } else { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s UNABLE TO COMPLETE CONNECT ACK WITH %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name)); + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + ORTE_FORCED_TERMINATE(1); + return; + } + break; + case MCA_OOB_USOCK_CONNECTED: + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:recv:handler CONNECTED", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + /* allocate a new message and setup for recv */ + if (NULL == peer->recv_msg) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:recv:handler allocate new recv msg", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + peer->recv_msg = OBJ_NEW(mca_oob_usock_recv_t); + if (NULL == peer->recv_msg) { + opal_output(0, "%s-%s mca_oob_usock_peer_recv_handler: unable to allocate recv message\n", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + return; + } + /* start by reading the header */ + peer->recv_msg->rdptr = (char*)&peer->recv_msg->hdr; + peer->recv_msg->rdbytes = sizeof(mca_oob_usock_hdr_t); + } + /* if the header hasn't been completely read, read it */ + if (!peer->recv_msg->hdr_recvd) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:recv:handler read hdr", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + if (ORTE_SUCCESS == (rc = read_bytes(peer))) { + /* completed reading the header */ + peer->recv_msg->hdr_recvd = true; + /* if this is a zero-byte message, then we are done */ + if (0 == peer->recv_msg->hdr.nbytes) { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s RECVD ZERO-BYTE MESSAGE FROM %s for tag %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->name), peer->recv_msg->hdr.tag); + peer->recv_msg->data = NULL; // make sure + peer->recv_msg->rdptr = NULL; + peer->recv_msg->rdbytes = 0; + } else { + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:recv:handler allocate data region of size %lu", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (unsigned long)peer->recv_msg->hdr.nbytes); + /* allocate the data region */ + peer->recv_msg->data = (char*)malloc(peer->recv_msg->hdr.nbytes); + /* point to it */ + peer->recv_msg->rdptr = peer->recv_msg->data; + peer->recv_msg->rdbytes = peer->recv_msg->hdr.nbytes; + } + /* fall thru and attempt to read the data */ + } else if (ORTE_ERR_RESOURCE_BUSY == rc || + ORTE_ERR_WOULD_BLOCK == rc) { + /* exit this event and let the event lib progress */ + return; + } else { + /* close the connection */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s:usock:recv:handler error reading bytes - closing connection", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + mca_oob_usock_peer_close(peer); + return; + } + } + + if (peer->recv_msg->hdr_recvd) { + /* continue to read the data block - we start from + * wherever we left off, which could be at the + * beginning or somewhere in the message + */ + if (ORTE_SUCCESS == (rc = read_bytes(peer))) { + /* we recvd all of the message */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s RECVD COMPLETE MESSAGE FROM %s OF %d BYTES FOR DEST %s TAG %d", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&peer->recv_msg->hdr.origin), + (int)peer->recv_msg->hdr.nbytes, + ORTE_NAME_PRINT(&peer->recv_msg->hdr.dst), + peer->recv_msg->hdr.tag); + /* am I the intended recipient? */ + if (peer->recv_msg->hdr.dst.jobid == ORTE_PROC_MY_NAME->jobid && + peer->recv_msg->hdr.dst.vpid == ORTE_PROC_MY_NAME->vpid) { + /* yes - post it to the RML for delivery */ + opal_output_verbose(OOB_USOCK_DEBUG_CONNECT, orte_oob_base_framework.framework_output, + "%s DELIVERING TO RML", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)); + ORTE_RML_POST_MESSAGE(&peer->recv_msg->hdr.origin, peer->recv_msg->hdr.tag, + peer->recv_msg->data, + peer->recv_msg->hdr.nbytes); + OBJ_RELEASE(peer->recv_msg); + } else { + /* no - we don't route things, so we promote this + * back to the OOB and let another transport move + * it along. If we are a daemon and it is intended + * for another of our local procs, it will just come + * back to us and be handled then + */ + snd = OBJ_NEW(orte_rml_send_t); + snd->dst = peer->recv_msg->hdr.dst; + snd->origin = peer->recv_msg->hdr.origin; + snd->tag = peer->recv_msg->hdr.tag; + snd->data = peer->recv_msg->data; + snd->count = peer->recv_msg->hdr.nbytes; + snd->cbfunc.iov = NULL; + snd->cbdata = NULL; + /* activate the OOB send state */ + ORTE_OOB_SEND(snd); + /* protect the data */ + peer->recv_msg->data = NULL; + /* cleanup */ + OBJ_RELEASE(peer->recv_msg); + return; + } + } else if (ORTE_ERR_RESOURCE_BUSY == rc || + ORTE_ERR_WOULD_BLOCK == rc) { + /* exit this event and let the event lib progress */ + return; + } else { + // report the error + opal_output(0, "%s-%s mca_oob_usock_peer_recv_handler: unable to recv message", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name))); + /* turn off the recv event */ + opal_event_del(&peer->recv_event); + peer->recv_ev_active = false; + ORTE_FORCED_TERMINATE(1); + return; + } + } + break; + default: + opal_output(0, "%s-%s mca_oob_usock_peer_recv_handler: invalid socket state(%d)", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&(peer->name)), + peer->state); + // mca_oob_usock_peer_close(peer); + break; + } +} + +static void snd_cons(mca_oob_usock_send_t *ptr) +{ + ptr->msg = NULL; + ptr->data = NULL; + ptr->hdr_sent = false; + ptr->iovnum = 0; + ptr->sdptr = NULL; + ptr->sdbytes = 0; +} +/* we don't destruct any RML msg that is + * attached to our send as the RML owns + * that memory. However, if we relay a + * msg, the data in the relay belongs to + * us and must be free'd + */ +static void snd_des(mca_oob_usock_send_t *ptr) +{ + if (NULL != ptr->data) { + free(ptr->data); + } +} +OBJ_CLASS_INSTANCE(mca_oob_usock_send_t, + opal_list_item_t, + snd_cons, snd_des); + +static void rcv_cons(mca_oob_usock_recv_t *ptr) +{ + ptr->hdr_recvd = false; + ptr->rdptr = NULL; + ptr->rdbytes = 0; +} +OBJ_CLASS_INSTANCE(mca_oob_usock_recv_t, + opal_list_item_t, + rcv_cons, NULL); + +static void err_cons(mca_oob_usock_msg_error_t *ptr) +{ + ptr->rmsg = NULL; + ptr->snd = NULL; +} +OBJ_CLASS_INSTANCE(mca_oob_usock_msg_error_t, + opal_object_t, + err_cons, NULL); + diff --git a/orte/mca/oob/usock/oob_usock_sendrecv.h b/orte/mca/oob/usock/oob_usock_sendrecv.h new file mode 100644 index 00000000000..162de1db580 --- /dev/null +++ b/orte/mca/oob/usock/oob_usock_sendrecv.h @@ -0,0 +1,252 @@ +/* + * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2010-2013 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef _MCA_OOB_USOCK_SENDRECV_H_ +#define _MCA_OOB_USOCK_SENDRECV_H_ + +#include "orte_config.h" + +#include "opal/class/opal_list.h" + +#include "orte/mca/rml/base/base.h" + +#include "oob_usock.h" +#include "oob_usock_hdr.h" + +/* usock structure for sending a message */ +typedef struct { + opal_list_item_t super; + mca_oob_usock_hdr_t hdr; + orte_rml_send_t *msg; + char *data; + bool hdr_sent; + int iovnum; + char *sdptr; + size_t sdbytes; +} mca_oob_usock_send_t; +OBJ_CLASS_DECLARATION(mca_oob_usock_send_t); + +/* usock structure for recving a message */ +typedef struct { + opal_list_item_t super; + mca_oob_usock_hdr_t hdr; + bool hdr_recvd; + char *data; + char *rdptr; + size_t rdbytes; +} mca_oob_usock_recv_t; +OBJ_CLASS_DECLARATION(mca_oob_usock_recv_t); + +/* Queue a message to be sent to a specified peer. The macro + * checks to see if a message is already in position to be + * sent - if it is, then the message provided is simply added + * to the peer's message queue. If not, then the provided message + * is placed in the "ready" position + * + * If the provided boolean is true, then the send event for the + * peer is checked and activated if not already active. This allows + * the macro to either immediately send the message, or to queue + * it as "pending" for later transmission - e.g., after the + * connection procedure is completed + * + * p => pointer to mca_oob_usock_peer_t + * s => pointer to mca_oob_usock_send_t + * f => true if send event is to be activated + */ +#define MCA_OOB_USOCK_QUEUE_MSG(p, s, f) \ + do { \ + opal_output_verbose(5, orte_oob_base_framework.framework_output, \ + "%s:[%s:%d] queue msg to %s", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + __FILE__, __LINE__, \ + ORTE_NAME_PRINT(&((s)->hdr.dst))); \ + /* if there is no message on-deck, put this one there */ \ + if (NULL == (p)->send_msg) { \ + (p)->send_msg = (s); \ + } else { \ + /* add it to the queue */ \ + opal_list_append(&(p)->send_queue, &(s)->super); \ + } \ + if ((f)) { \ + /* if we aren't connected, then start connecting */ \ + if (MCA_OOB_USOCK_CONNECTED != (p)->state) { \ + (p)->state = MCA_OOB_USOCK_CONNECTING; \ + ORTE_ACTIVATE_USOCK_CONN_STATE((p), \ + mca_oob_usock_peer_try_connect); \ + } else { \ + /* ensure the send event is active */ \ + if (!(p)->send_ev_active) { \ + opal_event_add(&(p)->send_event, 0); \ + (p)->send_ev_active = true; \ + } \ + } \ + } \ + }while(0); + +/* queue a message to be sent by one of our modules - must + * provide the following params: + * + * m - the RML message to be sent + * p - the final recipient + */ +#define MCA_OOB_USOCK_QUEUE_SEND(m, p) \ + do { \ + mca_oob_usock_send_t *msg; \ + int i; \ + opal_output_verbose(5, orte_oob_base_framework.framework_output, \ + "%s:[%s:%d] queue send to %s", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + __FILE__, __LINE__, \ + ORTE_NAME_PRINT(&((m)->dst))); \ + msg = OBJ_NEW(mca_oob_usock_send_t); \ + /* setup the header */ \ + msg->hdr.origin = (m)->origin; \ + msg->hdr.dst = (m)->dst; \ + msg->hdr.type = MCA_OOB_USOCK_USER; \ + msg->hdr.tag = (m)->tag; \ + /* point to the actual message */ \ + msg->msg = (m); \ + /* set the total number of bytes to be sent */ \ + if (NULL != (m)->buffer) { \ + msg->hdr.nbytes = (m)->buffer->bytes_used; \ + } else if (NULL != (m)->iov) { \ + msg->hdr.nbytes = 0; \ + for (i=0; i < (m)->count; i++) { \ + msg->hdr.nbytes += (m)->iov[i].iov_len; \ + } \ + } else { \ + msg->hdr.nbytes = (m)->count; \ + } \ + /* start the send with the header */ \ + msg->sdptr = (char*)&msg->hdr; \ + msg->sdbytes = sizeof(mca_oob_usock_hdr_t); \ + /* add to the msg queue for this peer */ \ + MCA_OOB_USOCK_QUEUE_MSG((p), msg, true); \ + }while(0); + +/* queue a message to be sent by one of our modules upon completing + * the connection process - must provide the following params: + * + * m - the RML message to be sent + * p - the final recipient + */ +#define MCA_OOB_USOCK_QUEUE_PENDING(m, p) \ + do { \ + mca_oob_usock_send_t *msg; \ + int i; \ + opal_output_verbose(5, orte_oob_base_framework.framework_output, \ + "%s:[%s:%d] queue pending to %s", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + __FILE__, __LINE__, \ + ORTE_NAME_PRINT(&((m)->dst))); \ + msg = OBJ_NEW(mca_oob_usock_send_t); \ + /* setup the header */ \ + msg->hdr.origin = (m)->origin; \ + msg->hdr.dst = (m)->dst; \ + msg->hdr.type = MCA_OOB_USOCK_USER; \ + msg->hdr.tag = (m)->tag; \ + /* point to the actual message */ \ + msg->msg = (m); \ + /* set the total number of bytes to be sent */ \ + if (NULL != (m)->buffer) { \ + msg->hdr.nbytes = (m)->buffer->bytes_used; \ + } else if (NULL != (m)->iov) { \ + msg->hdr.nbytes = 0; \ + for (i=0; i < (m)->count; i++) { \ + msg->hdr.nbytes += (m)->iov[i].iov_len; \ + } \ + } else { \ + msg->hdr.nbytes = (m)->count; \ + } \ + /* start the send with the header */ \ + msg->sdptr = (char*)&msg->hdr; \ + msg->sdbytes = sizeof(mca_oob_usock_hdr_t); \ + /* add to the msg queue for this peer */ \ + MCA_OOB_USOCK_QUEUE_MSG((p), msg, false); \ + }while(0); + +/* State machine for processing message */ +typedef struct { + opal_object_t super; + opal_event_t ev; + int reps; + orte_rml_send_t *msg; +} mca_oob_usock_msg_op_t; +OBJ_CLASS_DECLARATION(mca_oob_usock_msg_op_t); + +#define ORTE_ACTIVATE_USOCK_POST_SEND(ms, cbfunc) \ + do { \ + mca_oob_usock_msg_op_t *mop; \ + opal_output_verbose(5, orte_oob_base_framework.framework_output, \ + "%s:[%s:%d] post send to %s", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + __FILE__, __LINE__, \ + ORTE_NAME_PRINT(&((ms)->dst))); \ + mop = OBJ_NEW(mca_oob_usock_msg_op_t); \ + mop->msg = (ms); \ + opal_event_set(mca_oob_usock_module.ev_base, &mop->ev, -1, \ + OPAL_EV_WRITE, (cbfunc), mop); \ + opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \ + opal_event_active(&mop->ev, OPAL_EV_WRITE, 1); \ + } while(0); + +typedef struct { + opal_object_t super; + opal_event_t ev; + orte_rml_send_t *rmsg; + mca_oob_usock_send_t *snd; + orte_process_name_t hop; +} mca_oob_usock_msg_error_t; +OBJ_CLASS_DECLARATION(mca_oob_usock_msg_error_t); + +/* macro for reporting delivery errors back to the + * component for error handling + * + * s -> mca_oob_usock_send_t that failed (can be NULL) + * r -> orte_rml_send_t that failed (can be NULL) + * h -> process name for the next recipient + * cbfunc -> function to handle the callback + */ +#define ORTE_ACTIVATE_USOCK_MSG_ERROR(s, r, h, cbfunc) \ + do { \ + mca_oob_usock_msg_error_t *mop; \ + opal_output_verbose(5, orte_oob_base_framework.framework_output, \ + "%s:[%s:%d] post msg error to %s", \ + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), \ + __FILE__, __LINE__, \ + ORTE_NAME_PRINT((h))); \ + mop = OBJ_NEW(mca_oob_usock_msg_error_t); \ + if (NULL != (s)) { \ + mop->snd = (s); \ + } else if (NULL != (r)) { \ + /* use a proxy so we can pass NULL into the macro */ \ + mop->rmsg = (r); \ + } \ + mop->hop.jobid = (h)->jobid; \ + mop->hop.vpid = (h)->vpid; \ + opal_event_set(orte_event_base, &mop->ev, -1, \ + OPAL_EV_WRITE, (cbfunc), mop); \ + opal_event_set_priority(&mop->ev, ORTE_MSG_PRI); \ + opal_event_active(&mop->ev, OPAL_EV_WRITE, 1); \ + } while(0); + +#endif /* _MCA_OOB_USOCK_SENDRECV_H_ */ diff --git a/orte/mca/oob/usock/owner.txt b/orte/mca/oob/usock/owner.txt new file mode 100644 index 00000000000..4ad6f408ca3 --- /dev/null +++ b/orte/mca/oob/usock/owner.txt @@ -0,0 +1,7 @@ +# +# owner/status file +# owner: institution that is responsible for this package +# status: e.g. active, maintenance, unmaintained +# +owner: INTEL +status: maintenance diff --git a/orte/mca/plm/base/plm_base_launch_support.c b/orte/mca/plm/base/plm_base_launch_support.c index 566605851b3..78c8b266d55 100644 --- a/orte/mca/plm/base/plm_base_launch_support.c +++ b/orte/mca/plm/base/plm_base_launch_support.c @@ -670,10 +670,7 @@ void orte_plm_base_post_launch(int fd, short args, void *cbdata) return; } - cleanup: - /* need to init_after_spawn for debuggers */ - ORTE_ACTIVATE_JOB_STATE(jdata, ORTE_JOB_STATE_READY_FOR_DEBUGGERS); - + cleanup: /* cleanup */ OBJ_RELEASE(caddy); } diff --git a/orte/mca/rml/rml_types.h b/orte/mca/rml/rml_types.h index 05ec63a92af..39ef292f2fb 100644 --- a/orte/mca/rml/rml_types.h +++ b/orte/mca/rml/rml_types.h @@ -155,8 +155,17 @@ BEGIN_C_DECLS /* confirm spawn by tool */ #define ORTE_RML_TAG_CONFIRM_SPAWN 53 -#define ORTE_RML_TAG_MAX 100 +/*** QOS specific RML TAGS ***/ +#define ORTE_RML_TAG_OPEN_CHANNEL_REQ 54 +#define ORTE_RML_TAG_OPEN_CHANNEL_RESP 55 +#define ORTE_RML_TAG_MSG_ACK 56 +#define ORTE_RML_TAG_CLOSE_CHANNEL_REQ 57 +#define ORTE_RML_TAG_CLOSE_CHANNEL_ACCEPT 58 + +/* error notifications */ +#define ORTE_RML_TAG_NOTIFICATION 59 +#define ORTE_RML_TAG_MAX 100 #define ORTE_RML_TAG_NTOH(t) ntohl(t) #define ORTE_RML_TAG_HTON(t) htonl(t) diff --git a/orte/tools/orterun/orterun.c b/orte/tools/orterun/orterun.c index e86b79e1d5a..e90f4602d83 100644 --- a/orte/tools/orterun/orterun.c +++ b/orte/tools/orterun/orterun.c @@ -2341,8 +2341,8 @@ static void orte_debugger_init_before_spawn(orte_job_t *jdata) return; } strncpy(MPIR_attach_fifo, attach_fifo, MPIR_MAX_PATH_LENGTH - 1); - free(attach_fifo); - open_fifo(); + free(attach_fifo); + open_fifo(); } return; } @@ -2487,7 +2487,7 @@ void orte_debugger_init_after_spawn(int fd, short event, void *cbdata) orte_app_context_t *appctx; orte_vpid_t i, j; opal_buffer_t *buf; - int rc, k; + int rc; char **aliases, *aptr; /* if we couldn't get thru the mapper stage, we might @@ -2507,30 +2507,18 @@ void orte_debugger_init_after_spawn(int fd, short event, void *cbdata) /* trigger the debugger */ MPIR_Breakpoint(); - /* send a message to rank=0 of any app jobs to release it */ - for (k=1; k < orte_job_data->size; k++) { - if (NULL == (jdata = (orte_job_t*)opal_pointer_array_get_item(orte_job_data, k))) { - continue; - } - if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) { - /* ignore debugger jobs */ - continue; - } - if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, 0)) || - ORTE_PROC_STATE_UNTERMINATED < proc->state || - NULL == proc->rml_uri) { - /* proc is already dead or never registered with us (so we don't have - * contact info for him) - */ - continue; - } - buf = OBJ_NEW(opal_buffer_t); /* don't need anything in this */ - if (0 > (rc = orte_rml.send_buffer_nb(&proc->name, buf, - ORTE_RML_TAG_DEBUGGER_RELEASE, - orte_rml_send_callback, NULL))) { - opal_output(0, "Error: could not send debugger release to MPI procs - error %s", ORTE_ERROR_NAME(rc)); - OBJ_RELEASE(buf); - } + /* send a message to rank=0 to release it */ + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, 0)) || + ORTE_PROC_STATE_UNTERMINATED < proc->state ) { + /* proc is already dead */ + return; + } + buf = OBJ_NEW(opal_buffer_t); /* don't need anything in this */ + if (0 > (rc = orte_rml.send_buffer_nb(&proc->name, buf, + ORTE_RML_TAG_DEBUGGER_RELEASE, + orte_rml_send_callback, NULL))) { + opal_output(0, "Error: could not send debugger release to MPI procs - error %s", ORTE_ERROR_NAME(rc)); + OBJ_RELEASE(buf); } } return; @@ -2612,7 +2600,8 @@ void orte_debugger_init_after_spawn(int fd, short event, void *cbdata) /* if we are being launched under a debugger, then we must wait * for it to be ready to go and do some things to start the job */ - if (MPIR_being_debugged || NULL != orte_debugger_test_daemon) { + if (MPIR_being_debugged || NULL != orte_debugger_test_daemon || + NULL != getenv("ORTE_TEST_DEBUGGER_ATTACH")) { /* if we are not launching debugger daemons, then trigger * the debugger - otherwise, we need to wait for the debugger * daemons to be started @@ -2624,34 +2613,24 @@ void orte_debugger_init_after_spawn(int fd, short event, void *cbdata) /* trigger the debugger */ MPIR_Breakpoint(); - /* send a message to rank=0 of any app jobs to release it */ - for (k=1; k < orte_job_data->size; k++) { - if (NULL == (jdata = (orte_job_t*)opal_pointer_array_get_item(orte_job_data, k))) { - continue; - } - if (ORTE_FLAG_TEST(jdata, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) { - /* ignore debugger jobs */ - continue; - } - if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, 0)) || - ORTE_PROC_STATE_UNTERMINATED < proc->state || - NULL == proc->rml_uri) { - /* proc is already dead or never registered with us (so we don't have - * contact info for him) - */ - continue; - } - opal_output_verbose(2, orte_debug_output, - "%s sending debugger release to %s", - ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - ORTE_NAME_PRINT(&proc->name)); - buf = OBJ_NEW(opal_buffer_t); /* don't need anything in this */ - if (0 > (rc = orte_rml.send_buffer_nb(&proc->name, buf, - ORTE_RML_TAG_DEBUGGER_RELEASE, - orte_rml_send_callback, NULL))) { - opal_output(0, "Error: could not send debugger release to MPI procs - error %s", ORTE_ERROR_NAME(rc)); - OBJ_RELEASE(buf); - } + /* send a message to rank=0 to release it */ + if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, 0)) || + ORTE_PROC_STATE_UNTERMINATED < proc->state) { + /* proc is already dead or never registered with us (so we don't have + * contact info for him) + */ + return; + } + opal_output_verbose(2, orte_debug_output, + "%s sending debugger release to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(&proc->name)); + buf = OBJ_NEW(opal_buffer_t); /* don't need anything in this */ + if (0 > (rc = orte_rml.send_buffer_nb(&proc->name, buf, + ORTE_RML_TAG_DEBUGGER_RELEASE, + orte_rml_send_callback, NULL))) { + opal_output(0, "Error: could not send debugger release to MPI procs - error %s", ORTE_ERROR_NAME(rc)); + OBJ_RELEASE(buf); } } else { /* if I am launching debugger daemons, then I need to do so now