diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index d10d6d390f18..032d3f66707a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -312,7 +312,7 @@ void NodeManager::Heartbeat() { } void NodeManager::ClientAdded(const ClientTableDataT &client_data) { - ClientID client_id = ClientID::from_binary(client_data.client_id); + const ClientID client_id = ClientID::from_binary(client_data.client_id); RAY_LOG(DEBUG) << "[ClientAdded] received callback from client id " << client_id; if (client_id == gcs_client_->client_table().GetLocalClientId()) { @@ -334,21 +334,34 @@ void NodeManager::ClientAdded(const ClientTableDataT &client_data) { return; } - ResourceSet resources_total(client_data.resources_total_label, - client_data.resources_total_capacity); - this->cluster_resource_map_.emplace(client_id, SchedulingResources(resources_total)); - // Establish a new NodeManager connection to this GCS client. auto client_info = gcs_client_->client_table().GetClient(client_id); - RAY_LOG(DEBUG) << "[ClientAdded] CONNECTING TO: " - << " " << client_info.node_manager_address << " " + RAY_LOG(DEBUG) << "[ClientAdded] Trying to connect to client " << client_id << " at " + << client_info.node_manager_address << ":" << client_info.node_manager_port; boost::asio::ip::tcp::socket socket(io_service_); - RAY_CHECK_OK(TcpConnect(socket, client_info.node_manager_address, - client_info.node_manager_port)); + auto status = + TcpConnect(socket, client_info.node_manager_address, client_info.node_manager_port); + // A disconnected client has 2 entries in the client table (one for being + // inserted and one for being removed). When a new raylet starts, ClientAdded + // will be called with the disconnected client's first entry, which will cause + // IOError and "Connection refused". + if (!status.ok()) { + RAY_LOG(WARNING) << "Failed to connect to client " << client_id + << " in ClientAdded. TcpConnect returned status: " + << status.ToString() << ". This may be caused by " + << "trying to connect to a node manager that has failed."; + return; + } + + // The client is connected. auto server_conn = TcpServerConnection(std::move(socket)); remote_server_connections_.emplace(client_id, std::move(server_conn)); + + ResourceSet resources_total(client_data.resources_total_label, + client_data.resources_total_capacity); + cluster_resource_map_.emplace(client_id, SchedulingResources(resources_total)); } void NodeManager::ClientRemoved(const ClientTableDataT &client_data) {