-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Revert "Revert "Deflakey test advanced 9"" #35091
Revert "Revert "Deflakey test advanced 9"" #35091
Conversation
4a83335
to
bcdff25
Compare
bcdff25
to
66ce57d
Compare
66ce57d
to
d17ce3b
Compare
Hmmm. linux not working in this way.. :( |
Signed-off-by: Yi Cheng <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have some questions/concerns regarding reading sockets from this new method + original client.
Alternatively, what about we just handle the failure here?
ray/src/ray/common/client_connection.cc
Line 515 in c70331d
read_message_ = error_data; |
So if it is failed & call DisconnectClient, we call Close() inside client_connection.cc and invoke a callback (which reports the worker failure). We can also make the additional boolean like "is_disconnected" to make this idempotent?
KillChildProcs(); | ||
|
||
// Disconnect here before KillChildProcs to make the Raylet async wait shorter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment valid?
Disconnect here before KillChildProcs
The code looks like we Disconnect "after" KillChildProcs?
KillChildProcs(); | ||
// Disconnect here after KillChildProcs to make the Raylet async wait shorter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as below
@@ -138,6 +141,11 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> { | |||
std::function<void(const ray::Status &)> handler; | |||
}; | |||
|
|||
enum struct ConnectionStatus { RUNNING = 0, TERMINATING, TERMINATED }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enum struct ConnectionStatus { RUNNING = 0, TERMINATING, TERMINATED }; | |
enum class ConnectionStatus { RUNNING = 0, TERMINATING = 1, TERMINATED = 2 }; |
just personal preference haha..
@@ -125,6 +126,8 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> { | |||
|
|||
std::string DebugString() const; | |||
|
|||
void AsyncWaitTerminated(std::function<void()> callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a docstring?
@@ -125,6 +126,8 @@ class ServerConnection : public std::enable_shared_from_this<ServerConnection> { | |||
|
|||
std::string DebugString() const; | |||
|
|||
void AsyncWaitTerminated(std::function<void()> callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
void AsyncWaitTerminated(std::function<void()> callback); | |
void CloseAndAsyncWaitTerminated(std::function<void()> callback); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we are not closing it actively I believe. It's waiting until bad thing happened passivately. Maybe we shouldn't include close there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. I thought we close it because we have Close() method inside this Wait method (after async_wait).
status_ = ConnectionStatus::TERMINATING; | ||
} | ||
|
||
if (status_ == ConnectionStatus::TERMINATING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I don't know if I understand this part correctly. I have a couple questions here;
- Isn't it possible this code can corrupt the existing message handler? For example, it seems to be possible if the connection wasn't closed, this can read the socket that's supposed to be read by the raylet message handler, which can corrupt the data passed to the message handler. I feel like the behavior could be some ungrateful failure if this happens.
- If status == TERMINATING, we already guaranteed that the socket will be closed (because we close the connection after async_wait). IN this case, do we need any additional logic? Isn't this sufficient to just make the method idempotent by adding
if (status==TERMINATING) return;
?
if (status_ == ConnectionStatus::RUNNING) { | ||
socket_.async_wait(local_stream_socket::wait_type::wait_error, | ||
[this, callback](auto) { | ||
if (status_ != ConnectionStatus::TERMINATED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Isn't it possible this is returned "before" the connection is actually closed? It seems like this can return when it is simply ready to read the socket.
@@ -183,6 +183,41 @@ void ServerConnection::ReadBufferAsync( | |||
}); | |||
} | |||
} | |||
void ServerConnection::AsyncWaitTerminated(std::function<void()> callback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we write unit tests for this method? Seems like it is worth testing it (the logic is pretty complicated).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have some questions/concerns regarding reading sockets from this new method + original client.
Alternatively, what about we just handle the failure here?
ray/src/ray/common/client_connection.cc
Line 515 in c70331d
read_message_ = error_data; |
So if it is failed & call DisconnectClient, we call Close() inside client_connection.cc and invoke a callback (which reports the worker failure). We can also make the additional boolean like "is_disconnected" to make this idempotent?
The issue here is that, once we received DisconnectClient, the draining part is not handled any more. And the draining is similar as this one. Maybe I can change that logic there and make the change simpler and easier to understand. Let me give it a try. |
Sgtm! I scheduled a short meeting tomorrow to talk more about "the draining part is not handled any more. And the draining is similar as this one. " |
Offline synced. I'm going to close this one and open two things:
|
Reverts #35090
async_wait doesn't work in the same way in linux/mac/windows. Using another method to monitor the connection.
Only updates are in the commit
The new method listens to two events:
And this should cover all platforms