Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "Deflakey test advanced 9"" #35091

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions src/ray/common/client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,41 @@ void ServerConnection::ReadBufferAsync(
});
}
}
void ServerConnection::AsyncWaitTerminated(std::function<void()> callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write unit tests for this method? Seems like it is worth testing it (the logic is pretty complicated).

// Different platforms have different behavior for sockets in asio
// Wait for error and read until the end.
if (status_ == ConnectionStatus::RUNNING) {
socket_.async_wait(local_stream_socket::wait_type::wait_error,
[this, callback](auto) {
if (status_ != ConnectionStatus::TERMINATED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Isn't it possible this is returned "before" the connection is actually closed? It seems like this can return when it is simply ready to read the socket.

callback();
// Close the connection so it'll cancel all other operations
Close();
}
});
status_ = ConnectionStatus::TERMINATING;
}

if (status_ == ConnectionStatus::TERMINATING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't know if I understand this part correctly. I have a couple questions here;

  1. Isn't it possible this code can corrupt the existing message handler? For example, it seems to be possible if the connection wasn't closed, this can read the socket that's supposed to be read by the raylet message handler, which can corrupt the data passed to the message handler. I feel like the behavior could be some ungrateful failure if this happens.
  2. If status == TERMINATING, we already guaranteed that the socket will be closed (because we close the connection after async_wait). IN this case, do we need any additional logic? Isn't this sufficient to just make the method idempotent by adding if (status==TERMINATING) return;?

// This is only used for buffer. Never read. So race condition is
// not an issue here.
static char buffer[1024];
auto read_cb = [this, cb = callback](boost::system::error_code ec,
std::size_t /* length */) mutable {
if (ec) {
if (status_ != ConnectionStatus::TERMINATED) {
cb();
// Close the connection so it'll cancel all other operations
Close();
}
} else {
AsyncWaitTerminated(std::move(cb));
}
};
socket_.async_read_some(boost::asio::mutable_buffer(buffer, sizeof(buffer)),
std::move(read_cb));
}
}

ray::Status ServerConnection::WriteMessage(int64_t type,
int64_t length,
Expand Down
12 changes: 7 additions & 5 deletions src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
void Close() {
boost::system::error_code ec;
socket_.close(ec);
status_ = ConnectionStatus::TERMINATED;
}

/// Get the native handle of the socket.
Expand All @@ -125,11 +126,7 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {

std::string DebugString() const;

void AsyncWaitTerminated(std::function<void()> callback) {
// Async wait until the connection is disconnected.
socket_.async_wait(local_stream_socket::wait_type::wait_error,
[callback = std::move(callback)](auto) { callback(); });
}
void AsyncWaitTerminated(std::function<void()> callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a docstring?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void AsyncWaitTerminated(std::function<void()> callback);
void CloseAndAsyncWaitTerminated(std::function<void()> callback);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we are not closing it actively I believe. It's waiting until bad thing happened passivately. Maybe we shouldn't include close there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I thought we close it because we have Close() method inside this Wait method (after async_wait).


protected:
/// A private constructor for a server connection.
Expand All @@ -144,6 +141,11 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> {
std::function<void(const ray::Status &)> handler;
};

enum struct ConnectionStatus { RUNNING = 0, TERMINATING, TERMINATED };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
enum struct ConnectionStatus { RUNNING = 0, TERMINATING, TERMINATED };
enum class ConnectionStatus { RUNNING = 0, TERMINATING = 1, TERMINATED = 2 };

just personal preference haha..


/// Whether the connection is terminating.
ConnectionStatus status_ = ConnectionStatus::RUNNING;

/// The socket connection to the server.
local_stream_socket socket_;

Expand Down
13 changes: 0 additions & 13 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1561,23 +1561,10 @@ void NodeManager::DisconnectClient(const std::shared_ptr<ClientConnection> &clie

local_task_manager_->ClearWorkerBacklog(worker->WorkerId());
cluster_task_manager_->CancelTaskForOwner(worker->GetAssignedTaskId());
#ifdef _WIN32
// On Windows, when the worker is killed, client async wait won't get notified
// somehow.
RAY_CHECK_OK(
gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr, nullptr));
client->Close();
#else
// ReportWorkerFailure should happen after the worker exit completely.
// A better way is to monitor the pid exit. But that needs Process.h
// support async operation.
// Here we monitor the socket to achieve similar result.
// When the worker exited, the pid will be disconnected (local stream socket).
client->AsyncWaitTerminated([client, worker_failure_data_ptr, this] {
RAY_CHECK_OK(gcs_client_->Workers().AsyncReportWorkerFailure(worker_failure_data_ptr,
nullptr));
});
#endif
// TODO(rkn): Tell the object manager that this client has disconnected so
// that it can clean up the wait requests for this client. Currently I think
// these can be leaked.
Expand Down