Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.4: Multiple shuffle partitions per file in compaction #7897

Merged
merged 4 commits into from
Jun 27, 2023

Conversation

aokolnychyi
Copy link
Contributor

This PR adds a new compaction option called shuffle-partitions-per-file for shuffle-based file rewriters.

By default, our shuffling file rewriters assume each shuffle partition would become a separate output file. Attempting to generate large output files of 512 MB and more may strain the memory resources of the cluster as such rewrites would require lots of Spark memory. This parameter can be used to further divide up the data which will end up in a single file. For example, if the target file size is 2 GB, but the cluster can only handle shuffles of 512 MB, this parameter could be set to 4. Iceberg will use a custom coalesce operation to stitch these sorted partitions back together into a single sorted file.

@github-actions github-actions bot added the spark label Jun 24, 2023
@@ -59,7 +61,24 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {

public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;

/**
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested the current implementation on a table with 1 TB of data and a cluster with 16 GB executors 7 cores each. The target file size is 1 GB (zstd Parquet data). Sort-based optimizations without this option were spilling and failed, I lost all executors one by one. I tried using 8 shuffle partitions per file and the operation succeeded without any failures and produced properly sized files.

import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning

case class OrderAwareCoalesceExec(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired by CoalesceExec in Spark.

@aokolnychyi
Copy link
Contributor Author

cc @szehon-ho @flyrain @RussellSpitzer @singhpk234 @amogh-jahagirdar @rdblue

Copy link
Contributor

@singhpk234 singhpk234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thanks @aokolnychyi, this is a really nice addition to the compaction !

wondering if you are targeting updating the doc, with this new param in procedure, in a separate pr : https://iceberg.apache.org/docs/latest/spark-procedures/#usage-7


List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files("
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also assert that OrderAwareCoaleseExec is inserted by inspecting the plan ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit tricky in this case as the result plan would be CallExec, we don't have an easy way to inspect the triggered plan from the procedure. I did check manually, though.

@aokolnychyi
Copy link
Contributor Author

@singhpk234, I was originally planning to update the doc in a separate PR.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. This works for sorted data, because we always use range partitioning for sort, right?

@@ -225,6 +249,43 @@ public void testRewriteDataFilesWithZOrder() {
assertEquals("Should have expected rows", expectedRows, sql("SELECT * FROM %s", tableName));
}

@Test
Copy link
Collaborator

@szehon-ho szehon-ho Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice, but did we also add a test that assert the sort order is preserved within partition? (ex, small partition, and just assert that that the file is in order)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a check below for the order of records. I just added a similar one for the regular sort, so we verify the order of records is correct both in regular sorts and in z-ordering.

/**
* The number of shuffle partitions to use for each output file. By default, this file rewriter
* assumes each shuffle partition would become a separate output file. Attempting to generate
* large output files of 512 MB and more may strain the memory resources of the cluster as such
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and more => or higher

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

*
* <p>Note using this parameter requires enabling Iceberg Spark session extensions.
*/
public static final String SHUFFLE_PARTITIONS_PER_FILE = "shuffle-partitions-per-file";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not to block this change, but did we consider having shuffle-threshold? Ie, if we have some partition with 2G but others that are way less than 512MB, no need to shuffle the ones that are less?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean like switching to a local sort if the size of the data to compact is small?

Copy link
Collaborator

@szehon-ho szehon-ho Jun 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering the use case, where we set shuffle-partitions-per-file to 4, because we want 2GB files but can only shuffle 512mb. However, consider an Iceberg partition (rewrite group) that has only 512MB files during this rewrite. Will we still shuffle to four partitions in this case and coalesce at end, unnecessarily? I may be missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be still fine to apply this optimization as there is no extra cost. I achieved best results with 128 MB shuffle blocks so it should be fairly safe to assume the operation would complete fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but would there be issues in contending for pods. Also wouldn't it make more sense to have 128MB as a conf (shuffle-threshold), otherwise its always a bit dynamic depending on the max partition size? Not sure if there are other issues with this approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I have never seen issues with this approach in any of our prod jobs in the last few years. Not applying this split if the size of the job is less than 128MB could be a valid step but it would require quite a bit of changes to pass more info around. I'd probably skip it for now until we experience any issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can do it later then if there's a need

@aokolnychyi
Copy link
Contributor Author

I just realized we don't provide a comprehensive list of supported options in the docs. I have been meaning to improve our docs for a while, so I'll add this config then.

@aokolnychyi aokolnychyi merged commit d98e7a1 into apache:master Jun 27, 2023
rodmeneses pushed a commit to rodmeneses/iceberg that referenced this pull request Feb 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants