Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files #9464
Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files #9464
Changes from 9 commits
6155e8b
066e4f1
ea5a425
7c0a441
6ffdbc9
318ee3c
1b5a26f
e7bd45b
a35514f
4f3eccf
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resize
is already happening in the out.writeInt function, I don't think the following line is needed;https://github.com/apache/flink/pull/24191/files#diff-fe4433a7c83db13761a692e0d9eb4e7e3be7a511b7aa938bb1bcbb7b70fa45ccR290
The
resize
function is also a private function, so we don't have access to it in here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out.writeInt
only increases the buffer by 4 to hold the integer.I think we need the resize call here. but this does impose the blocker here because
DataInputDeserializer#resize(int minCapacityAdd)
is a private method here. I am not sure this can work (just copying the read/writeLongUTF`.cc @pvary @elkhand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't
out.write(bytearr, 0, count);
inwriteUTFBytes(out, str, (int) utflen);
call the resize for us?https://github.com/apache/flink/blob/f75935245799471ddf025d2bab0d0d212e79088e/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java#L119
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary you are right.
I guess the resize call from the Flink PR is not necessary then. this is where @javrasya had the question for this thread. but I guess @javrasya 's original comment is not 100% inaccurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, as @pvary said,
writeUTFBytes
will resize the the array as needed already. The only downside of not resizing it preliminary like in the Flink PR could be that there will be two resizing one when writingout.writeInt((int) utflen);
and thenout.write(bytearr, 0, count);
. This won't happen all the time, but if it happens to occur, it will cause original array to be copied to a new resized array twice by the resize function.As an improvement, maybe I can combine these two by turning
writeUTFBytes
intowriteUTFBytesWithSize
and write only once asout.write(bytearr, 0, count + 4)
(4
which is for the size) so that resizing of the actual byte array would be done once when needed. Then it will do exactly as in the Flink PR.Wdyt @pvary @stevenzwu ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that is necessary. with @pvary 's explanation, I think the current code is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the current code, it is possible that it resizes twice due to these two lines. I am not saying that it would run wrongly but two resize operations would cause two array copy operation (when the size is not big enough) which is worse than one resize. Or am I missing something 🤔 ?
The early resize in the Flink PR kind of prevents that. Now we don't have such behavior in this PR due to the reasons mentioned earlier. My suggestion above would reduce the number of resize operation to 1 since we will write both the utf length and the utf bytes all in one go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@javrasya: I would keep your code even with the possibility of the double resize.
This is only temp code until we get the Flink code available. If we do not see an actual bottleneck, I would keep the code as simple as possible.