Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy into partition by #5964

Merged
merged 40 commits into from
Feb 1, 2023
Merged

Conversation

samansmink
Copy link
Contributor

@samansmink samansmink commented Jan 23, 2023

This PR introduces a first version of the partitioned COPY operator. This PR builds on previous work from @lnkuiper adding the PartitionedColumnData (#4970) and the per thread output from @hannes (#5412)

To summarize, the Partitioned COPY:

  • Supports both CSV and Parquet
  • Currently fully materializes data during partitioning (to be improved in follow-up pr's)
  • Outputs 1 file per partition per thread (similar to PER_THREAD_OUTPUT flag for COPY  #5412)

The partitioned write is used similarly to the PER_THREAD_OUTPUT feature:

COPY table TO '__TEST_DIR__/partitioned' (FORMAT PARQUET, PARTITION_BY (part_col_a, part_col_b));

this command will write files in this format, which is known as the hive partitioning scheme:

__TEST_DIR__/partitioned/part_col_a=<val>/part_col_b=<val>/data_<thread_number>.parquet

Partioned copy to S3 also works:

COPY table TO 's3://mah-bucket/partitioned' (FORMAT PARQUET, PARTITION_BY (part_col_a, part_col_b));

Finally, a check is performed for existing files/directories which is currently quite conservative (and on S3 will add a bit of latency). To disable this check and force writing, an ALLOW_OVERWRITE flag is added:

COPY table TO '__TEST_DIR__/partitioned' (FORMAT PARQUET, PARTITION_BY (part_col_a, part_col_b), ALLOW_OVERWRITE TRUE);

Note that this also works with the PER_THREAD_OUTPUT feature.

Implementation

To support these features, a new class HivePartitionedColumnData is introduced, which implements the PartitionedColumnData interface from #4970. The main complexity here is that the HivePartitionedColumnData class needs to be able to discover new partitions in parallel. For the RadixPartitioning that was already implemented, the number of partitions is known in advance and does not change. Partition discovery is handled by all threads while writing tuples to their local HivePartitionedColumnData. To prevent expensive locking when synchronizing the partition discovery, each thread will have a local partition map to do quick lookups during partitioning, with a shared global state where new partitions are added. This means that only when adding new partitions to the global state a lock is required. Since this partitioned write is not expected to scale to super large amounts of partitions anyway, this should work well. Due to this shared state the partition indices between the thread local HivePartitionedColumnData objects will remain in sync.

Benchmarks

Here's some rough benchmarks to give an indication of the performance overhead on a M1 macbook:

COPY lineitem_sf1 to file.parquet w/ 8 threads Avg Relative to fastest
Regular copy 7.50 443.98%
Disable preserve order (#5756) 1.40 1.64%
Threads (#5412) 1.38 0.00%
Threads + disable order preserving 1.42 2.73%
Hive Partitioned (4 partitions over 2 cols) 1.62 17.71%
Hive Partitioned (28 partitions over 3 cols) 1.92 38.93%
Hive Partitioned (84 paritions over 2 cols) 1.89 36.93%
Hive Partitioned (432 paritions over 3 cols) 2.58 86.83%
Hive Partitioned (1160 paritions over 5 cols) 5.67 310.78%
Hive Partitioned (2526 partitions over 1 col) 11.15 708.10%

Note that performance for low amounts of partitions is very good. Higher amounts of partitions get pretty bad, but this is most likely due to the fact that at this partition count, the resulting files only contain very few tuples, leading to large IO overhead. I would expect that for larger files the relative overhead will be lower, but more benchmarking here is required.

Transform partition columns before writing

Note that partition values are converted to strings. There are probably many edge cases where this won't work nicely. However, just using SQL in your copy statement you can transform the partition by columns however you want. For example to add a prefix to an existing column call part_col you could do:

COPY (SELECT * EXCLUDE (part_col), 'prefix-'::VARCHAR || part_col::VARCHAR as part_col FROM test) 
TO '__TEST_DIR__/partitioned' (FORMAT PARQUET, PARTITION_BY (part_col));

Limitations

The partitioned write fully materializes, so it will quickly fill the available memory for large tables. This is not necessarily a blocker as the buffer manager can offload to disk. This will however mean that enough local disk space is required and for large files the data is:

  • stored in memory
  • offloaded to disk
  • read from disk
  • written to the final partition file

This is not ideal, but will still be good enough for many use cases.

Future work

The ideal partitioning COPY would produce 1 file per partition while providing mechanisms to limit memory usage and not require full materialization.

First step towards this goal is to make a streaming variant that can flush partitions as partition tuple limits (or possibly global operator limits) are reached. This would produce a single file per partition, per flush of that partition.

The second step would be to not close files after flushing, allowing multiple flushes to a single file. With this we would achieve the desired behaviour where we can produce a single file per partition in a streaming fashion.

We have some nice idea's on implementing the above, so hopefully two more PR's coming up soon-ish :)

@samansmink
Copy link
Contributor Author

Perfect, now just add partitions column 🤡😜

@djouallah there ya go 😁

Copy link
Contributor

@lnkuiper lnkuiper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks great, and the performance numbers are awesome! Performance seems to degrade quite slowly, even with ~400 partitions. In the example with 2526 partitions, I think the performance degradation is not only due to I/O, but also due to writing the data to the in-memory partitions. With so few partitions, we are basically appending 1-2 tuples to each of the partitions at a time, which brings us down to tuple-at-a-time processing 😓 not much you can do about this, though. If you're interested, you could try sorting the data by the column that you're partitioning on. This should greatly speed up the partitioning code since more tuples are being written to a partition at a time.

Just one comment: Is it worth making a .benchmark file so that the CI checks that the performance does not degrade?

@tobilg
Copy link

tobilg commented Jan 24, 2023

Great work, this will be a killer feature for DuckDB! Do you have plans to support writing to S3 as well in the future?

@samansmink
Copy link
Contributor Author

@lnkuiper thanks for the review, will add some benchmarks later today!

@tobilg wow somehow I completely forgot to test this.. Thanks for the comment 😅 This feature should just work™ with S3, but it may need some additional tweaks. Ill add the tests later today. An important use case for the partitioning feature is certainly to be able to repartition a dataset from and to S3 in a fully streaming fashion.

@tobilg
Copy link

tobilg commented Jan 24, 2023

This feature should just work™ with S3, but it may need some additional tweaks. Ill add the tests later today. An important use case for the partitioning feature is certainly to be able to repartition a dataset from and to S3 in a fully streaming fashion

Thanks, that's great news! Will there be a limitation regarding the number of partitions?

@samansmink
Copy link
Contributor Author

@tobilg There's for sure going to be a prohibitive overhead when partitioning with very large amounts of partitions, see also the comment by @lnkuiper and the benchmarks in this PR. However, with the optimizations we have planned (and perhaps the sorting optimization proposed by @lnkuiper) we should be able to support decently large amounts of partitions.

@Mytherin
Copy link
Collaborator

With so few partitions, we are basically appending 1-2 tuples to each of the partitions at a time, which brings us down to tuple-at-a-time processing 😓 not much you can do about this,

This is similar to the problem we encounter in hash tables - what we do about this there is that we divide the partitioning in two passes. First we figure out for each tuple where it should be written - then we scatter to multiple locations at once. Perhaps something similar can be done to solve this problem with partitioning?

@tobilg
Copy link

tobilg commented Jan 24, 2023

we should be able to support decently large amounts of partitions.

That'll be a great and very useful feature then! Other common ways on AWS to repartition parquet data in S3 have pretty tight limits, e.g. Athena CTAS queries can only write to 100 partitions simulaneously. One can work around this by using multiple passed, or pre-partitioning the data, but the DX is not great.

@lnkuiper
Copy link
Contributor

@Mytherin This is a good idea. It is tricky to implement, however, because we are not scattering to a row layout but appending to an intermediate buffer, which is a DataChunk. Maybe we can discuss the options tomorrow.

@samansmink
Copy link
Contributor Author

@lnkuiper @Mytherin I added two simple microbenchmarks which i've also added to the .github/regression/micro.csv however, since these write to files i'm not 100% this will work well as it may trigger false positives for regression

@tobilg Added the S3 tests. There were also some minor issues that are now resolved. So S3 seems to work fine!

Copy link
Collaborator

@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Very exciting feature. Some comments from my side:

src/common/hive_partitioning.cpp Outdated Show resolved Hide resolved
src/common/hive_partitioning.cpp Show resolved Hide resolved
test/sql/copy/hive_filter_pushdown_bug.test Show resolved Hide resolved
third_party/libpg_query/src_backend_parser_scan.cpp Outdated Show resolved Hide resolved
Copy link
Collaborator

@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fixes! One minor comment, otherwise looks great:

src/common/bind_helpers.cpp Outdated Show resolved Hide resolved
Copy link
Collaborator

@Mytherin Mytherin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates - LGTM! Ready to merge after CI passes.

@samansmink
Copy link
Contributor Author

@Mytherin
there's 2 failures left:

  • codecov fails with http error, can be restarted but it has passed in previous runs with little changes so i think its fine
  • odbc failure on linux aarch64 is also http thing and unrelated, it also has succeeded in previous runs

@Mytherin Mytherin merged commit 068c0fd into duckdb:master Feb 1, 2023
@Mytherin
Copy link
Collaborator

Mytherin commented Feb 1, 2023

Thanks! Indeed everything looks good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants