-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files #9464
Flink: Don't fail to serialize IcebergSourceSplit when there is too many delete files #9464
Conversation
@@ -147,7 +147,7 @@ byte[] serializeV2() throws IOException { | |||
|
|||
for (FileScanTask fileScanTask : fileScanTasks) { | |||
String taskJson = FileScanTaskParser.toJson(fileScanTask); | |||
out.writeUTF(taskJson); | |||
writeBytes(out, taskJson); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I don't use out.writeBytes here is because it is buggy. The position with buffer is already incremented by 1 in here (Actual place where the increment happens) and it is incremented by the whole size once again in here which makes it problematic.
So I had to take some inspiration from that code and introduce a custom version of it in the class further down.
for (int i = 0; i < s.length(); i++) { | ||
out.writeByte(s.charAt(i)); | ||
} | ||
out.writeByte('\n'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason for this is because, now the deserialize method uses readLine
instead of readUTF
(because it does not work with it anymore), that is the only way I could think of in which I could still load the tasks one by one in an iterator fashion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if we have comment with \n
?
Let's take a step back before rushing to a solution. Here are some things we have to solve:
My current ideas:
Thanks for starting the work on this @javrasya ! |
Good catches @pvary , thank you. What if we get full inspiration from writeUTF and have our own writer but support longer texts. Btw, the reason why it limits the size to be 65K max because the first 2 bytes of the serialized value holds the length of the UTF and that is unsigned short which can be max 65K. I have introduced my own writeUTF and called it writeLongUTF/readLongUTF. It writes the first bytes which holds the length as int which is 4 bytes instead of unsigned short. That would be the most compatible (but still not full compatible) way with V2. Do mind taking a look at those changes here and let me know what you think? I didn't want to update this PR directly without talking to you about it? If you think that is good idea, I can proceed and merge it on this branch first and we can continue with the discussions here. The utilities in the Helper class is not exactly as they are in the Flink source but very similar. But it is not compatible with V2 since that is using initial 2 bytes to indicate the length. Introducing v3 is good idea as you suggested. But not really sure how we would be able to distinguish a serialized split with V2 earlier from V3 though 🤔 Do you know how this was done from v1 to v2? Can you help me there? |
Another idea is to still use original main...javrasya:iceberg:issue-9410-introduce-v3-serde-write-utf-as-chunks It would still break the compatibility with V2 though. I don't know how would that be achieved without really breaking it since we don't know the version information in the serialized value of a split 🤔 |
Forget about me saying not knowing the version, I was looking at the code and saw that the version information is put in when enumerator serializes the data. I guess introducing v3 would be the way to go, so that wouldn't be a problem. |
I would prefer creating a well balanced writeLongUTF solution which would be a candidate to get into the Flink code later. I prefer your solution where you write out the length in a long instead of 2 chars, but instead of reserving the whole byte array in advance, I would prefer to write out the data in 64k sized byte arrays (a sorter one at the end) Not sure about the read path. Maybe there is no way to optimize the reserved memory at that point, as we have to keep the Strings in memory anyways... @stevenzwu: any suggestions? |
I was also leaning toward that one @pvary. But what benefit you think it would bring to write out the data in 65k sized byte arrays. DataOutputSerializer from Flink also writes it into a byte array under the hood. It even resize the byte array every time the size of the array is exceeded which is even be more expensive than doing it only once. (resizing is doing arraycopy to a new bigger byte array) |
I like Peter's suggestion of proper |
The split/FileScanTask should only contain ASCII. The other alternative is to get the byte array (UTF-8) from String. I am not too concerned about the performance if this is only applied to strings longer than 64 KB.
I don't quite agree with the current implementation as it writes 2 bytes for every char. It will double the size/bytes. We are hurting the common scenarios to address this corner cases. |
If I understand correctly, the |
@pvary you are right. agree that doc field may contain non ASCII chars. |
Started the discussion on the Flink dev list: https://lists.apache.org/thread/ocm6cj0h8o3wbwo7fz2l1b4odss750rk Let's see what the community thinks about adding this feature to |
Is the the conclusion to wait for them to provide that new utility or proceed and improve this PR according to our discussions? |
Let's give the community a few more days to decide. Then we could proceed |
Created the FLINK jira: https://issues.apache.org/jira/browse/FLINK-34228 |
Also, here is the Flink PR: apache/flink#24191 If the PR has been accepted there, then the serialization format is finalized. |
Do you want me to proceed and update my PR @pvary accordingly? |
I would wait until the PR at least merged to the Flink main branch |
@javrasya: Finally the Flink PR got merged. See the related jira. So we have a clear path forward. The Flink serializer will be released only with Flink 1.20.0. In the meantime we can create our own method which has the same write format, with a deprecation message that this should be replaced with the Flink code whenever it is released. @javrasya: Do you have time to move this forward? Thanks, |
Hi @pvary, Good idea. Let me have a look at it and update this PR |
@javrasya wanted to check if you will have time to update the PR this week. If not I can take on and update this PR accordingly. |
if (utflen > Integer.MAX_VALUE - 4) { | ||
throw new UTFDataFormatException("Encoded string is too long: " + utflen); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resize
is already happening in the out.writeInt function, I don't think the following line is needed;
The resize
function is also a private function, so we don't have access to it in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out.writeInt
only increases the buffer by 4 to hold the integer.
I think we need the resize call here. but this does impose the blocker here because DataInputDeserializer#resize(int minCapacityAdd)
is a private method here. I am not sure this can work (just copying the read/writeLongUTF`.
if (utflen > Integer.MAX_VALUE - 4) {
throw new UTFDataFormatException("Encoded string is too long: " + utflen);
} else if (this.position > this.buffer.length - utflen - 2) {
resize((int) utflen + 4);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't out.write(bytearr, 0, count);
in writeUTFBytes(out, str, (int) utflen);
call the resize for us?
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
if (this.position > this.buffer.length - len) {
resize(len);
}
System.arraycopy(b, off, this.buffer, this.position, len);
this.position += len;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary you are right.
I guess the resize call from the Flink PR is not necessary then. this is where @javrasya had the question for this thread. but I guess @javrasya 's original comment is not 100% inaccurate.
if (utflen > Integer.MAX_VALUE - 4) {
throw new UTFDataFormatException("Encoded string is too long: " + utflen);
} else if (this.position > this.buffer.length - utflen - 2) {
resize((int) utflen + 4);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, as @pvary said, writeUTFBytes
will resize the the array as needed already. The only downside of not resizing it preliminary like in the Flink PR could be that there will be two resizing one when writing out.writeInt((int) utflen);
and then out.write(bytearr, 0, count);
. This won't happen all the time, but if it happens to occur, it will cause original array to be copied to a new resized array twice by the resize function.
As an improvement, maybe I can combine these two by turning writeUTFBytes
into writeUTFBytesWithSize
and write only once as out.write(bytearr, 0, count + 4)
(4
which is for the size) so that resizing of the actual byte array would be done once when needed. Then it will do exactly as in the Flink PR.
Wdyt @pvary @stevenzwu ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an improvement, maybe I can combine these two by turning writeUTFBytes into writeUTFBytesWithSize and write only once as out.write(bytearr, 0, count + 4) (4 which is for the size) so that resizing of the actual byte array would be done once when needed.
I am not sure that is necessary. with @pvary 's explanation, I think the current code is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the current code, it is possible that it resizes twice due to these two lines. I am not saying that it would run wrongly but two resize operations would cause two array copy operation (when the size is not big enough) which is worse than one resize. Or am I missing something 🤔 ?
The early resize in the Flink PR kind of prevents that. Now we don't have such behavior in this PR due to the reasons mentioned earlier. My suggestion above would reduce the number of resize operation to 1 since we will write both the utf length and the utf bytes all in one go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@javrasya: I would keep your code even with the possibility of the double resize.
This is only temp code until we get the Flink code available. If we do not see an actual bottleneck, I would keep the code as simple as possible.
…too many delete files which are created due to an upsert oepration on the table and then it is not possible to consume from such a table
…s V3 to allow smooth migration
…tion in the Flink code Ahmet
…duce code duplication
88fe53c
to
1b5a26f
Compare
I've done some more refactorings. |
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/IcebergSourceSplit.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
Outdated
Show resolved
Hide resolved
if (utflen > Integer.MAX_VALUE - 4) { | ||
throw new UTFDataFormatException("Encoded string is too long: " + utflen); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out.writeInt
only increases the buffer by 4 to hold the integer.
I think we need the resize call here. but this does impose the blocker here because DataInputDeserializer#resize(int minCapacityAdd)
is a private method here. I am not sure this can work (just copying the read/writeLongUTF`.
if (utflen > Integer.MAX_VALUE - 4) {
throw new UTFDataFormatException("Encoded string is too long: " + utflen);
} else if (this.position > this.buffer.length - utflen - 2) {
resize((int) utflen + 4);
}
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/split/SerializerHelper.java
Show resolved
Hide resolved
@javrasya: Please fix the failures as well. |
Done @pvary , I guess the triggers are requiring an approval to kick in. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@javrasya there is still style check failure
Execution failed for task ':iceberg-flink:iceberg-flink-1.18:checkstyleTest'.
> A failure occurred while executing org.gradle.api.plugins.quality.internal.CheckstyleAction
> Checkstyle rule violations were found. See the report at: file:///home/runner/work/iceberg/iceberg/flink/v1.18/flink/build/reports/checkstyle/test.html
Checkstyle files with violations: 1
Checkstyle violations by severity: [error:3]
Sorry for the inconvenience, didn't know how to run he checks locally, now I know and it is all good in my local. Pushed the changes. 🙏 @stevenzwu |
Thanks for the PR @javrasya and @stevenzwu for the review! @javrasya: Please port the changes to v1.19, and v1.17 Thanks, |
Is that manually done @pvary or is there a utility in the repo to do that? |
…h bigger payload
Co-authored-by: Elkhan Dadashov <[email protected]>
…any delete files (apache#9464)
Co-authored-by: Elkhan Dadashov <[email protected]>
The changes in this PR is addressing the problem in #9410 which makes it problematic to consume from an Iceberg table which is getting regular upserts.
The problem occurs on this line with Flink's DataOutputserializer object when the size of an object which is being serialized is above 65kb and that limit is easily exceeded for those tables which are having too many deleted files due to regular upserts.