-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] Cleanup handling for nondeterministic object size during transfer #22639
[core] Cleanup handling for nondeterministic object size during transfer #22639
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! While this doesn't block this PR, I'm wondering if we may also see cases where the data size is the same but data contents differ across versions of the object.
I believe that can't happen since we currently stream object data from a single source (never re-using chunks), but we may want to add a random version / checksum of the object data to reject these cases as well in the future.
RAY_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", 0, 300)); | ||
} | ||
ObjectBufferPool::ObjectBufferPool( | ||
std::shared_ptr<plasma::PlasmaClientInterface> store_client, uint64_t chunk_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
const int64_t object_size = | ||
static_cast<int64_t>(data_size) - static_cast<int64_t>(metadata_size); | ||
std::shared_ptr<Buffer> data; | ||
RAY_LOG(INFO) << "store_client_ " << store_client_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stray log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks...
Yeah I was thinking this as well, a version number would be good and I think it will work pretty much use the same codepath. By the way, it is actually possible to get chunks from different sources right now if a transfer fails midway through, or if pull retries are close enough together that they overlap. That also means it's possible to get liveness issues if this happens repeatedly, but I figured it's fine for now. |
Ah, this is if there are two concurrent pushers to the same pull requester? That does sound problematic. |
Yup if we add a unique version number (like a randomized UUID) should help up eliminate the same size different content case. |
…fer (ray-project#22639) Currently object transfers assume that the object size is fixed. This is a bad assumption during failures, especially with lineage reconstruction enabled and tasks with nondeterministic outputs. This PR cleans up the handling and hopefully guards against two cases where the object size may change during a transfer: 1. The object manager's size information does not match the object in the local plasma store (due to async notifications). --> the object manager overwrites its own information if it finds that the physical object has a different size. 2. The receiver's created buffer size does not match the sender's object size. --> the receiver destroys the previous buffer and creates a new buffer with the correct size. This might cause some transient errors but eventually object transfer should succeed. Unfortunately I couldn't trigger this from Python because it depends on some pretty specific timing conditions. However, I did add some unit tests for case 2 (this is the majority of the PR).
Why are these changes needed?
Currently object transfers assume that the object size is fixed. This is a bad assumption during failures, especially with lineage reconstruction enabled and tasks with nondeterministic outputs.
This PR cleans up the handling and hopefully guards against two cases where the object size may change during a transfer:
Unfortunately I couldn't trigger this from Python because it depends on some pretty specific timing conditions. However, I did add some unit tests for case 2 (this is the majority of the PR).
Checks
scripts/format.sh
to lint the changes in this PR.