Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-20694][DOCS][SQL] Document DataFrameWriter partitionBy, bucketBy and sortBy in SQL guide #17938

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,113 @@ Starting from Spark 2.1, persistent datasource tables have per-partition metadat

Note that partition information is not gathered by default when creating external datasource tables (those with a `path` option). To sync the partition information in the metastore, you can invoke `MSCK REPAIR TABLE`.

### Bucketing, Sorting and Partitioning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that examples are missing writing to partitioned + bucketed table. eg.

my_dataframe.write.format("orc").partitionBy("i").bucketBy(8, "j", "k").sortBy("j", "k").saveAsTable("my_table")

There could be multiple possible orderings of partitionBy, bucketBy and sortBy calls. Not all of them are supported, not all of them would produce correct outputs. I have not done any exhaustive study of the same but I think this should be documented to guide people while using these APIs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we emphasize partitioning? I think it's more widely used than bucketing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tejasapatil

There could be multiple possible orderings of partitionBy, bucketBy and sortBy calls. Not all of them are supported, not all of them would produce correct outputs.

Shouldn't the output be the same no matter the order? sortBy is not applicable for partitionBy and takes precedence over bucketBy, if both are present. This is Hive's behaviour if I am not mistaken, and at first glance Spark is doing the same thing. It there any gotcha here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan I think we can redirect to partition discovery here. But explaining the difference and possible applications (low vs. high cardinality) could be a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the output be the same no matter the order?

Theoretically yes. Practically I don't know what happens. Since you are documenting, it will be worthwhile to check that and record if it works as expected (or if there is any weirdness).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I thought you are implying there are some known issues. This actually behaves sensibly - all supported options seem to work independent of the order, and unsupported ones (partitionBy + sortBy without bucketBy or overlapping bucketBy and partitionBy columns) give enough feedback to diagnose the issue.

I haven't tested this with large datasets though, so there can be hidden issues.


For file-based data source it is also possible to bucket and sort or partition the output.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, For file-based data source it -> For file-based data source, it

Bucketing and sorting is applicable only to persistent tables:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is applicable -> are applicable


<div class="codetabs">

<div data-lang="scala" markdown="1">
{% include_example write_sorting_and_bucketing scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example write_sorting_and_bucketing java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example write_sorting_and_bucketing python/sql/datasource.py %}
</div>

<div data-lang="sql" markdown="1">

{% highlight sql %}

CREATE TABLE users_bucketed_by_name(
name STRING,
favorite_color STRING,
favorite_NUMBERS array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with the example in the other APIs, it is missing the SORTED BY clause.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please use the same table names people_bucketed with the same column names in the example? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zero323 Could you also resolve this? Thanks!


{% endhighlight %}

</div>

</div>

while partitioning can be used with both `save` and `saveAsTable`:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like @tejasapatil suggested, we should give one more example about partitioned and bucketed table, so that users know they can use bucketing and partitioning at the same time

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

both `save` and `saveAsTable`

->

both `save` and `saveAsTable` when using the Dataset APIs. 



<div class="codetabs">

<div data-lang="scala" markdown="1">
{% include_example write_partitioning scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example write_partitioning java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example write_partitioning python/sql/datasource.py %}
</div>

<div data-lang="sql" markdown="1">

{% highlight sql %}

CREATE TABLE users_by_favorite_color(
name STRING,
favorite_color STRING,
favorite_NUMBERS array<integer>
) USING csv PARTITIONED BY(favorite_color);

{% endhighlight %}

</div>

</div>

It is possible to use both partitions and buckets for a single table:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitions and buckets -> partitioning and bucketing


<div class="codetabs">

<div data-lang="scala" markdown="1">
{% include_example write_partition_and_bucket scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example write_partition_and_bucket java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example write_partition_and_bucket python/sql/datasource.py %}
</div>

<div data-lang="sql" markdown="1">

{% highlight sql %}

CREATE TABLE users_bucketed_and_partitioned(
name STRING,
favorite_color STRING,
favorite_NUMBERS array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) INTO 42 BUCKETS;

{% endhighlight %}

</div>

</div>

`partitionBy` creates a directory structure as described in the [Partition Discovery](#partition-discovery) section.
Because of that it has limited applicability to columns with high cardinality. In contrast `bucketBy` distributes
Copy link
Member

@gatorsmile gatorsmile May 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of that it has -> Thus, it has

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In contrast bucketBy distributes -> In contrast, bucketBy distributes

data across fixed number of buckets and can be used if a number of unique values is unbounded.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used if -> used when


## Parquet Files

[Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,22 @@ private static void runBasicDataSourceExample(SparkSession spark) {
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
// $example off:direct_sql$
// $example on:write_sorting_and_bucketing$
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
// $example off:write_sorting_and_bucketing$
// $example on:write_partitioning$
usersDF.write().partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet");
// $example off:write_partitioning$
// $example on:write_partition_and_bucket$
peopleDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed");
// $example off:write_partition_and_bucket$

spark.sql("DROP TABLE IF EXISTS people_bucketed");
spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed");
}

private static void runBasicParquetExample(SparkSession spark) {
Expand Down
20 changes: 20 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,35 @@ def basic_datasource_example(spark):
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
# $example off:generic_load_save_functions$

# $example on:write_partitioning$
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
# $example off:write_partitioning$

# $example on:write_partition_and_bucket$
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed"))
# $example off:write_partition_and_bucket$

# $example on:manual_load_options$
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
# $example off:manual_load_options$

# $example on:write_sorting_and_bucketing$
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
# $example off:write_sorting_and_bucketing$

# $example on:direct_sql$
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
# $example off:direct_sql$

spark.sql("DROP TABLE IF EXISTS people_bucketed")
spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")


def parquet_example(spark):
# $example on:basic_parquet_example$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ object SQLDataSourceExample {
// $example on:direct_sql$
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
// $example off:direct_sql$
// $example on:write_sorting_and_bucketing$
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
// $example off:write_sorting_and_bucketing$
// $example on:write_partitioning$
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
// $example off:write_partitioning$
// $example on:write_partition_and_bucket$
peopleDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed")
// $example off:write_partition_and_bucket$

spark.sql("DROP TABLE IF EXISTS people_bucketed")
spark.sql("DROP TABLE IF EXISTS people_partitioned_bucketed")
}

private def runBasicParquetExample(spark: SparkSession): Unit = {
Expand Down