-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-499: Update file serialization to use the streaming serialization format. #292
Conversation
Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); | ||
List<Integer> recordBatches = footer.getRecordBatches(); | ||
for (Integer rbBlock : recordBatches) { | ||
//Assert.assertEquals(0, rbBlock % 8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new serialized batches aren't aligned. Whats the requirement/goal for alignment? This would be easy to update in MessageSerializer but I'm not sure what the current desired behavior is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8 byte alignment is sufficient
Assert.assertEquals(0, rbBlock.getMetadataLength() % 8); | ||
List<Integer> recordBatches = footer.getRecordBatches(); | ||
for (Integer rbBlock : recordBatches) { | ||
//Assert.assertEquals(0, rbBlock % 8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you want aligned. The start of buffers? various metadata pieces?
The serialized format is now:
message header with size prefix
rowbatch metadata header with size prefix
buffers
We can pad between any of them/none of them. Currently its none of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only the starts of buffers need to be aligned (the buffers are already supposed to be padded). So only need to add padding to the serialized metadata
|
||
// Byte offsets into the file for the start of record batch messages. | ||
recordBatches: [ int ]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These offsets should be long / int64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One downside of this change getting rid of the Blocks, is that reading a record batch from the file will now require 3 reads from the source (length, metadata, then body) -- before, you could read the whole payload in 1 read if you wanted. not sure how much that matters
@@ -33,11 +30,11 @@ | |||
|
|||
private final Schema schema; | |||
|
|||
private final List<ArrowBlock> dictionaries; | |||
private final List<Integer> dictionaries; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BigInteger?
…rialization format. This removes the old serialization code and unifies the code paths. This also changes the serialization format to match the alignment requirements from the previous file format.
ByteBuffer serializedHeader = | ||
serializeHeader(MessageHeader.RecordBatch, bodyLength + metadata.remaining() + 4); | ||
serializeHeader(MessageHeader.RecordBatch, messageLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The serialization would be quite a bit simpler if we enforced that Message was also fixed length. It would be similarly easy if we enforced that the current fields in Message always came first.
i.e. read(sizeof(Message)) was always going to be valid and if the header grew, you could tell by looking header.version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeating my comment from slack here for the others: the message.header could be arbitrarily large, so are you saying that the new fixed-size message would contain the version, message type (as an enum), metadata length, and body length with padding that makes 16 bytes i guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm confused. I'm referring to this:
https://github.com/apache/arrow/blob/master/format/Message.fbs#L276
Is this expected to be arbitrarily large?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The header
union could be arbitrarily large since it grows with the number of fields in the record batch or schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is not what I'm interpreting the union to mean. I'm interpreting it to basically mean enum which could grow I guess if there many more message types. It doesn't contain the thing inside the union.
https://google.github.io/flatbuffers/md__schemas.html
I've very little experience with flatbufs though. Am I totally off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the object you put in the union must be appended as part of constructing the root flatbuffer. See for example:
https://github.com/apache/arrow/blob/master/cpp/src/arrow/ipc/metadata-internal.cc#L312
When you read the Flatbuffer in C++, if gives you the functions header_type()
(which gives you the enum) and header()
, which is a const void*
that you can cast to the correct Flatbuffer type (e.g. reinterpret_cast<const flatbuf::Schema*>
) depending on the value of the enum.
The Flatbuffers documentation around unions is not very good; it remember it took me a while to figure out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see what you did in the Java code, and why we are confused. You are writing the embedded message header separately, not actually storing the data in the union. So if I looked at the Message in your stream, the header
field would have the header union enum set properly, but the union data payload would be nullptr
arrow/java/vector/src/main/java/org/apache/arrow/vector/stream/MessageSerializer.java
Line 116 in 6811d3f
// Write the metadata, with the 4 byte little endian prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is what flatc generates for Message:
https://gist.github.com/wesm/3a9c2106b3a3634d8af1e6e66260836f
When you serialize the header you aren't calling Message.addHeader
|
||
bodyLength: long; | ||
|
||
length: int; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with combining the metadata and body length is that it makes it difficult / impossible to do partial reads / field projections in a record batch. If you can inspect the metadata without reading the body, you can determine the byte ranges you need to read only a certain subset of fields.
ByteBuffer serializedHeader = | ||
serializeHeader(MessageHeader.RecordBatch, bodyLength + metadata.remaining() + 4); | ||
serializeHeader(MessageHeader.RecordBatch, messageLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeating my comment from slack here for the others: the message.header could be arbitrarily large, so are you saying that the new fixed-size message would contain the version, message type (as an enum), metadata length, and body length with padding that makes 16 bytes i guess
Change-Id: I4bae06d5833ffd24f94522ad23ea2dfcc459d86b
Change-Id: Ib8beb014310219a7ab8263802ec94d2ea5af6805
Change-Id: I2571b4ec6b753a4e207c7dbbd2059b7c2bfc0be2
Change-Id: I2ca87b9e944ce9613f63cee7af81f5556a67b5e8
Use unions in Java, simplify record batch deserialization, change C++ to use Message type
see wesm@c4059e6 -- that was at least one additional problem with the deserialization path for files |
i'm not sure what else is wrong |
Also, I don't quite understand the semantics of when you need to call |
With this commit wesm@367367b, the tests pass except for 1, but I think the failure is correct |
OK, if you pull this commit wesm@97d6120, you should get a green build including integration tests |
can you update the title to start with |
@wesm Done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. Thank you very much!
LGTM |
…I changes in ARROW-782 Author: Wes McKinney <[email protected]> Closes apache#292 from wesm/PARQUET-947 and squashes the following commits: 2d68d5b [Wes McKinney] Fix typo 35feebc [Wes McKinney] Update to Arrow HEAD 7fa2b1b [Wes McKinney] Account for API changes in ARROW-782 8d6c50d [Wes McKinney] Update Arrow version 7b2016f [Wes McKinney] Remove arrow_io library after ARROW-795
…I changes in ARROW-782 Author: Wes McKinney <[email protected]> Closes apache#292 from wesm/PARQUET-947 and squashes the following commits: 2d68d5b [Wes McKinney] Fix typo 35feebc [Wes McKinney] Update to Arrow HEAD 7fa2b1b [Wes McKinney] Account for API changes in ARROW-782 8d6c50d [Wes McKinney] Update Arrow version 7b2016f [Wes McKinney] Remove arrow_io library after ARROW-795 Change-Id: I69ae35e9b0995684ca01f0fd6fea473ecca5420a
…I changes in ARROW-782 Author: Wes McKinney <[email protected]> Closes apache#292 from wesm/PARQUET-947 and squashes the following commits: 2d68d5b [Wes McKinney] Fix typo 35feebc [Wes McKinney] Update to Arrow HEAD 7fa2b1b [Wes McKinney] Account for API changes in ARROW-782 8d6c50d [Wes McKinney] Update Arrow version 7b2016f [Wes McKinney] Remove arrow_io library after ARROW-795 Change-Id: I69ae35e9b0995684ca01f0fd6fea473ecca5420a
…I changes in ARROW-782 Author: Wes McKinney <[email protected]> Closes apache#292 from wesm/PARQUET-947 and squashes the following commits: 2d68d5b [Wes McKinney] Fix typo 35feebc [Wes McKinney] Update to Arrow HEAD 7fa2b1b [Wes McKinney] Account for API changes in ARROW-782 8d6c50d [Wes McKinney] Update Arrow version 7b2016f [Wes McKinney] Remove arrow_io library after ARROW-795 Change-Id: I69ae35e9b0995684ca01f0fd6fea473ecca5420a
…I changes in ARROW-782 Author: Wes McKinney <[email protected]> Closes apache#292 from wesm/PARQUET-947 and squashes the following commits: 2d68d5b [Wes McKinney] Fix typo 35feebc [Wes McKinney] Update to Arrow HEAD 7fa2b1b [Wes McKinney] Account for API changes in ARROW-782 8d6c50d [Wes McKinney] Update Arrow version 7b2016f [Wes McKinney] Remove arrow_io library after ARROW-795 Change-Id: I69ae35e9b0995684ca01f0fd6fea473ecca5420a
…in /java (#43367) Bumps [commons-codec:commons-codec](https://github.com/apache/commons-codec) from 1.17.0 to 1.17.1. <details> <summary>Changelog</summary> <p><em>Sourced from <a href="https://github.com/apache/commons-codec/blob/master/RELEASE-NOTES.txt">commons-codec:commons-codec's changelog</a>.</em></p> <blockquote> <h2>Apache Commons Codec 1.17.1 RELEASE NOTES</h2> <p>The Apache Commons Codec component contains encoders and decoders for various formats such as Base16, Base32, Base64, digest, and Hexadecimal. In addition to these widely used encoders and decoders, the codec package also maintains a collection of phonetic encoding utilities.</p> <p>Feature and fix release. Requires a minimum of Java 8.</p> <h2>Fixed Bugs</h2> <ul> <li> <pre><code> Md5Crypt now throws IllegalArgumentException on an invalid prefix. Thanks to Gary Gregory. </code></pre> </li> </ul> <h2>Changes</h2> <ul> <li> <pre><code> Bump org.apache.commons:commons-parent from 69 to 71 [#286](apache/commons-codec#286). Thanks to Gary Gregory. </code></pre> </li> <li> <pre><code> Bump org.codehaus.mojo:animal-sniffer-maven-plugin from 1.23 to 1.24 [#293](apache/commons-codec#293). Thanks to Dependabot. </code></pre> </li> <li> <pre><code> Bump org.codehaus.mojo:taglist-maven-plugin from 3.0.0 to 3.1.0 [#292](apache/commons-codec#292). Thanks to Dependabot. </code></pre> </li> </ul> <p>For complete information on Apache Commons Codec, including instructions on how to submit bug reports, patches, or suggestions for improvement, see the Apache Commons Codec website:</p> <p><a href="https://commons.apache.org/proper/commons-codec/">https://commons.apache.org/proper/commons-codec/</a></p> <p>Download page: <a href="https://commons.apache.org/proper/commons-codec/download_codec.cgi">https://commons.apache.org/proper/commons-codec/download_codec.cgi</a></p> <hr /> </blockquote> </details> <details> <summary>Commits</summary> <ul> <li><a href="https://github.com/apache/commons-codec/commit/965109705c5236b05011e1c45f47d991abfa521e"><code>9651097</code></a> Prepare for the next release candidate</li> <li><a href="https://github.com/apache/commons-codec/commit/0d99b46fa1a8a61cf869ff4cc9b9e2402129f199"><code>0d99b46</code></a> Merge branch 'master' of <a href="https://gitbox.apache.org/repos/asf/commons-codec">https://gitbox.apache.org/repos/asf/commons-codec</a></li> <li><a href="https://github.com/apache/commons-codec/commit/0c63e18b8a5e5b9b0195a632d136c85c1452b34f"><code>0c63e18</code></a> Prepare for the next release candidate</li> <li><a href="https://github.com/apache/commons-codec/commit/be06260d90edd8ad43879eb2862dac765e807cc0"><code>be06260</code></a> Bump actions/upload-artifact from 4.3.3 to 4.3.4 (<a href="https://redirect.github.com/apache/commons-codec/issues/295">#295</a>)</li> <li><a href="https://github.com/apache/commons-codec/commit/09ef422871b8d202d4dca1ff67d91f32723d3862"><code>09ef422</code></a> Bump github/codeql-action from 3.25.11 to 3.25.12 (<a href="https://redirect.github.com/apache/commons-codec/issues/294">#294</a>)</li> <li><a href="https://github.com/apache/commons-codec/commit/86ef922a57d6c2632dc84c41cb04798fe489431c"><code>86ef922</code></a> Merge branch 'master' of <a href="https://gitbox.apache.org/repos/asf/commons-codec.git">https://gitbox.apache.org/repos/asf/commons-codec.git</a></li> <li><a href="https://github.com/apache/commons-codec/commit/974cf873936633d7bb7e060b1caf119e60b60e98"><code>974cf87</code></a> Remove redundant keywords</li> <li><a href="https://github.com/apache/commons-codec/commit/0c82238e5b9a914fdd862df3f6ab0085f533b5e8"><code>0c82238</code></a> Remove redundant keywords</li> <li><a href="https://github.com/apache/commons-codec/commit/1e6544e4f1d269c0edf8b702f60d6dc866b3affa"><code>1e6544e</code></a> Remove redundant keywords</li> <li><a href="https://github.com/apache/commons-codec/commit/8dcf9d1f745c88beedf54e63c7a5d683725ebdea"><code>8dcf9d1</code></a> Remove redundant keywords</li> <li>Additional commits viewable in <a href="https://github.com/apache/commons-codec/compare/rel/commons-codec-1.17.0...rel/commons-codec-1.17.1">compare view</a></li> </ul> </details> <br /> [![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=commons-codec:commons-codec&package-manager=maven&previous-version=1.17.0&new-version=1.17.1)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores) Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@ dependabot rebase`. [//]: # (dependabot-automerge-start) [//]: # (dependabot-automerge-end) --- <details> <summary>Dependabot commands and options</summary> <br /> You can trigger Dependabot actions by commenting on this PR: - `@ dependabot rebase` will rebase this PR - `@ dependabot recreate` will recreate this PR, overwriting any edits that have been made to it - `@ dependabot merge` will merge this PR after your CI passes on it - `@ dependabot squash and merge` will squash and merge this PR after your CI passes on it - `@ dependabot cancel merge` will cancel a previously requested merge and block automerging - `@ dependabot reopen` will reopen this PR if it is closed - `@ dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually - `@ dependabot show <dependency name> ignore conditions` will show all of the ignore conditions of the specified dependency - `@ dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself) - `@ dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself) - `@ dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself) </details> Authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: David Li <[email protected]>
No description provided.