Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18572][SQL] Add a method listPartitionNames to ExternalCatalog #15998

Closed

Conversation

mallman
Copy link
Contributor

@mallman mallman commented Nov 23, 2016

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572)

What changes were proposed in this pull request?

Currently Spark answers the SHOW PARTITIONS command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a getPartitionNames method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table.

To test the performance impact of this PR, I ran the SHOW PARTITIONS command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table table1 and the latter table table2. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows:

Spark at bdc8153, SHOW PARTITIONS table1, times in seconds:
7.901
3.983
4.018
4.331
4.261

Spark at bdc8153, SHOW PARTITIONS table2
(Timed out after 10 minutes with a SocketTimeoutException.)

Spark at this PR, SHOW PARTITIONS table1, times in seconds:
3.801
0.449
0.395
0.348
0.336

Spark at this PR, SHOW PARTITIONS table2, times in seconds:
5.184
1.63
1.474
1.519
1.41

Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master.

This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x.

How was this patch tested?

I added a unit test to VersionsSuite which tests that the Hive client's getPartitionNames method returns the correct number of partitions.

@mallman mallman changed the title [SPARK-18572][SQL] Add a method listPartitionName to ExternalCatalog [SPARK-18572][SQL] Add a method listPartitionNames to ExternalCatalog Nov 23, 2016
def listPartitionNames(
db: String,
table: String,
partialSpec: Option[TablePartitionSpec] = None): Seq[String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need the default implementation? maybe even just remove this since we only have two implementations for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move this implementation to InMemoryCatalog, but it feels more natural to me to keep it in ExternalCatalog because it has no implementation-specific code.

Alternatively, I could refactor listPartitions in InMemoryCatalog to accommodate a simple implementation of listPartitionNames there. The runtime behavior would be essentially the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My worry is that when we create a new implementation of the external catalog, we'd forget to implement this, leading to bad performance.

@SparkQA
Copy link

SparkQA commented Nov 24, 2016

Test build #69089 has finished for PR 15998 at commit f20ef8b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Nov 25, 2016

CC @ericl @cloud-fan


listPartitions(db, table, partialSpec).map { partition =>
partitionColumnNames.map { name =>
name + "=" + partition.spec(name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need escaping, as provided by escapePathName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be column name and value escaping as these are column names and values. What's confusing to me is that the previous code uses a method called escapePathName which suggests something related to a filesystem path. Indeed the source code includes a comment stating that the aforementioned method comes from a Hive class called FileUtils.

I'll look into how Hive escapes partition names and values in its getPartitionNames method and report back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked into how Hive escapes partition names. It does not appear to do any escaping at all. The partition names are in a string column in a table in the metastore database. Hive simply queries that partition name column on this table, filtering by Hive database and table, sorts the results and returns them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on manual testing it seems that hive returns escaped values here. So for example, if you had a partition with column B and value ============, then the hive client returns b=%3D%3D%3D%3D%3D%3D%3D%3D%3D%3D%3D%3D. So this code should probably call getPathFragment to have the same behavior.

*
* A partial partition spec may optionally be provided to filter the partitions returned.
* For instance, if there exist partitions (a='1', b='2'), (a='1', b='3') and (a='2', b='4'),
* then a partial spec of (a='1') will return the first two only.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: newline here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newline added here and in comment for listPartitions.


if (actualPartColNames.exists(partColName => partColName != partColName.toLowerCase)) {
clientPartitionNames.map { partName =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the (un)escaping here correct? It would be nice to have a unit test to verify these edge cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added coverage for partition column names with uppercase characters to HiveCommandSuite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I just realized that isn't exactly what you were asking about.

I think adding coverage for mixed case partition column names is a good thing, but I'll address path (un)escaping as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct, and based on what I've found in the Hive source code (and reading the DDL grammar on the Hive wiki) I'm going to push a change with another proposal for partition column name/value handling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run some tests to compare behavior between Hive and Spark in handling gnarly partition column names, and I found some disparities. We've spent a considerable amount of time wrangling with partition column name handling recently, and I'm not sure what semantics we've decided on. To ensure the behavior I'm seeing is what we're expecting, I want to describe a scenario I ran.

In my test scenario, I created a table named test with the stock Hive 2.1.0 distribution. (I simply downloaded it from its download page and initialized an empty Derby schema store.) The exact DDL I used to create this table is as follows:

create table test(a string) partitioned by (`P``Дr t` int);

When I do a describe test with hive it shows the column name as p`дr t. It appears to lowercase the P and the cyrillic Д before storing the table schema it in the metastore. I then run

alter table test add partition(`P``Дr t`=0);

When I run show partitions test in hive it gives me p`дr t=0. Additionally, when I list the contents of the test table's base directory in HDFS, the partition directory entry is

/user/hive/warehouse/test/p`дr t=0

If I drop the table, create it with spark-sql using the same DDL as before and do a describe test, the partition column is given as P`Дr t. Spark has preserved the case of the partition column name. If I then do

alter table test add partition(`P``Дr t`=0);

in spark-sql and show partitions test I get P`Дr t=0. When I list the directory contents in HDFS, I get

/user/hive/warehouse/test/P`Дr t=0

The upshot is Hive is lowercasing the partition column name and Spark is leaving it unaltered. Is this correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. As of 2.1 we go through some extremes to preserve the case of partition names by saving extra metadata inside the table storage properties. When the table is resolved in HiveExternalCatalog, the lowercase schema is reconciled with the saved original names.

}.mkString("/")
}
} else {
clientPartitionNames
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider not having this optimization to avoid two different code paths here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try a build with and without this code branch and see if there's a noticeable performance difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran show partitions on a table with about 99k partitions using a build with and without this short circuit. I didn't keep count, but I must have run the command at least 10 times in each session. The best time I got for listing with the short circuit was 1.496 seconds. The best time I got for listing without the short circuit was 1.671.

I can't say I feel strongly one way or the other. I guess my vote is to keep the short circuit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not in the data path and the times are comparable, I think it would be better to drop the short circuit.

@SparkQA
Copy link

SparkQA commented Nov 25, 2016

Test build #69174 has finished for PR 15998 at commit 4e03c3e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

table: CatalogTable,
partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withHiveState {
partialSpec match {
case None => client.getPartitionNames(table.database, table.identifier.table, -1).asScala
Copy link
Member

@gatorsmile gatorsmile Nov 27, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment to explain what is -1?

After reading Hive, this parm is the maximum number of partition names to return.


// NB: some table partition column names in this test suite have upper-case characters to test
// column name case preservation. Do not lowercase these partition names without good reason.
sql("CREATE TABLE parquet_tab4 (price int, qty int) partitioned by (Year int, Month int)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code changes are not related to this PR. How about submitting a separate PR for this purpose? These changes only cover the Hive serde table. We should create dedicated test cases, if we do not have such test cases.

*
* @param db database name
* @param table table name
* @param partialSpec partition spec
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: an extra space

val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
val table = formatTableName(tableName.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Option(db)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call requirePartialMatchedPartitionSpec here?

Copy link
Contributor Author

@mallman mallman Nov 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. The only method that calls that method is dropPartitions. It makes me wonder if there's a good reason for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without requirePartialMatchedPartitionSpec, do we detect these negative cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I will add that in my next push.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@SparkQA
Copy link

SparkQA commented Nov 27, 2016

Test build #69195 has finished for PR 15998 at commit 594c693.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

where is the speed-up come from? Is it because the hive API getPartitionNames is faster than getPartitions? Or is it because we generate the partition string(a=1/b=2/c=3) at hive side and it's faster than Spark?

@mallman
Copy link
Contributor Author

mallman commented Nov 28, 2016

where is the speed-up come from? Is it because the hive API getPartitionNames is faster than getPartitions? Or is it because we generate the partition string(a=1/b=2/c=3) at hive side and it's faster than Spark?

To answer getPartitionNames, Hive does a simple SELECT on a table in the metastore. It doesn't not transform the results—the partition names are in a column in that table. That's where the speed up comes from.

@mallman
Copy link
Contributor Author

mallman commented Nov 29, 2016

Hi Guys,

Repeating my comment/query for @ericl. I'm hoping someone can provide affirmation/refutation to my question before I proceed with new unit tests.

I've run some tests to compare behavior between Hive and Spark in handling gnarly partition column names, and I found some disparities. We've spent a considerable amount of time wrangling with partition column name handling recently, and I'm not sure what semantics we've decided on. To ensure the behavior I'm seeing is what we're expecting, I want to describe a scenario I ran.

In my test scenario, I created a table named test with the stock Hive 2.1.0 distribution. (I simply downloaded it from its download page and initialized an empty Derby schema store.) The exact DDL I used to create this table is as follows:

create table test(a string) partitioned by (`P``Дr t` int);

When I do a describe test with hive it shows the column name as p`дr t. It appears to lowercase the P and the cyrillic Д before storing the table schema it in the metastore. I then run

alter table test add partition(`P``Дr t`=0);

When I run show partitions test in hive it gives me p`дr t=0. Additionally, when I list the contents of the test table's base directory in HDFS, the partition directory entry is

/user/hive/warehouse/test/p`дr t=0

If I drop the table, create it with spark-sql using the same DDL as before and do a describe test, the partition column is given as P`Дr t. Spark has preserved the case of the partition column name. If I then do

alter table test add partition(`P``Дr t`=0);

in spark-sql and show partitions test I get P`Дr t=0. When I list the directory contents in HDFS, I get

/user/hive/warehouse/test/P`Дr t=0

The upshot is Hive is lowercasing the partition column name and Spark is leaving it unaltered. Is this correct?

@ericl
Copy link
Contributor

ericl commented Nov 29, 2016 via email

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good. I think there are some issues with escaping in other parts of Spark, but they do not seem related to this PR, so I'll file a separate ticket for those.

For example, I tried spark.sqlContext.range(10).selectExpr("id", "id as A", "'A$\\=%' as B").write.partitionBy("A", "B").mode("overwrite").saveAsTable("testy"), and this fails to write out a correctly formed table.


if (actualPartColNames.exists(partColName => partColName != partColName.toLowerCase)) {
clientPartitionNames.map { partName =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. As of 2.1 we go through some extremes to preserve the case of partition names by saving extra metadata inside the table storage properties. When the table is resolved in HiveExternalCatalog, the lowercase schema is reconciled with the saved original names.

}.mkString("/")
}
} else {
clientPartitionNames
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is not in the data path and the times are comparable, I think it would be better to drop the short circuit.


listPartitions(db, table, partialSpec).map { partition =>
partitionColumnNames.map { name =>
name + "=" + partition.spec(name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on manual testing it seems that hive returns escaped values here. So for example, if you had a partition with column B and value ============, then the hive client returns b=%3D%3D%3D%3D%3D%3D%3D%3D%3D%3D%3D%3D. So this code should probably call getPathFragment to have the same behavior.

@@ -189,11 +189,28 @@ abstract class ExternalCatalog {
spec: TablePartitionSpec): Option[CatalogTablePartition]

/**
* List the names of all partitions that belong to the specified table, assuming it exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you specify the return format of partitions (e.g. /a=v1,b=v2) and that the values are escape, and that callers can decode it using the functions found in PartitioningUtils?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitioningUtils is in SQL core, not catalyst. I decided to use the escapePathName and unescapePathName methods directly.

checkAnswer(
sql("SHOW PARTITIONS part_datasrc"),
Row("A=1") ::
Row("A=2") ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation seems a bit funny here, could inline the rows

@ericl
Copy link
Contributor

ericl commented Nov 29, 2016

* looks good once InMemoryCatalog is fixed


listPartitions(db, table, partialSpec).map { partition =>
partitionColumnNames.map { name =>
escapePathName(name) + "=" + escapePathName(partition.spec(name))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned above, PartitioningUtils is not available here so I went ahead with escapePathName itself.

@mallman
Copy link
Contributor Author

mallman commented Nov 30, 2016

I will work on additional unit test coverage tomorrow.

@SparkQA
Copy link

SparkQA commented Nov 30, 2016

Test build #69378 has finished for PR 15998 at commit 48ae2a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* `p1=v1/p2=v2/p3=v3`. Each partition column name and value is an escaped path name, and can be
* decoded with the `ExternalCatalogUtils.unescapePathName` method.
*
* A partial partition spec may optionally be provided to filter the partitions returned, as
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just copy the document for the partial partition spec from listPartitions to here?

clientPartitionNames.map { partName =>
val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName)
partSpec.map { case (partName, partValue) =>
escapePathName(actualPartColNames.find(_.equalsIgnoreCase(partName)).get) + "=" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just build a map with lower cased partition column name as key, and path-escaped actual partition column name as value? then we can do a simple map lookup here, i.e. map.get(partName.toLowerCase)

* Returns the partition names for the given table that match the supplied partition spec.
* If no partition spec is specified, all partitions are returned.
*/
def getPartitionNames(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we create 2 APIs? who will call this method directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I guess I'm just following the example of the two getPartitions methods.

@cloud-fan
Copy link
Contributor

I think this PR is for performance improvement, why we update the HiveCommandSuite here? Otherwise LGTM

@mallman mallman force-pushed the spark-18572-list_partition_names branch from 48ae2a3 to b15d0d2 Compare December 1, 2016 18:05
@mallman
Copy link
Contributor Author

mallman commented Dec 1, 2016

Added a couple of unit tests and rebased.

@mallman
Copy link
Contributor Author

mallman commented Dec 1, 2016

LMK if there's anything else you'd like me to address, otherwise—assuming the tests pass—please merge to master. Also, it would be great if we can back port this into 2.1 as well.

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69491 has finished for PR 15998 at commit b15d0d2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Dec 1, 2016

The test that failed is definitely related to this PR, however it passes for me locally. I'll investigate...

@SparkQA
Copy link

SparkQA commented Dec 1, 2016

Test build #69501 has finished for PR 15998 at commit 3ad8c9a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@mallman do you know which tests fail the partition spec checking? It looks to me that before we call partition related API in SessionCatalog, the partition column names should be normalized already.

@mallman
Copy link
Contributor Author

mallman commented Dec 5, 2016

@mallman do you know which tests fail the partition spec checking? It looks to me that before we call partition related API in SessionCatalog, the partition column names should be normalized already.

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/69581/testReport/junit/org.apache.spark.sql.hive/PartitionProviderCompatibilitySuite/SPARK_18659_insert_overwrite_table_with_lowercase___partition_management_true/

@SparkQA
Copy link

SparkQA commented Dec 5, 2016

Test build #69652 has finished for PR 15998 at commit 095cdd5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

cloud-fan commented Dec 5, 2016

@mallman
Copy link
Contributor Author

mallman commented Dec 5, 2016

@cloud-fan That's unfortunate if it's going to block this PR. How do we proceed?

@cloud-fan
Copy link
Contributor

I think we can just fix it in this PR, it's only several lines change.

@mallman
Copy link
Contributor Author

mallman commented Dec 5, 2016

@cloud-fan I'm not familiar enough with that code to be comfortable making that change. Can you submit a PR against VideoAmp:spark-18572-list_partition_names with the necessary changes to this file?

@gatorsmile
Copy link
Member

#15998 (comment) found a bug. If this PR will not be merged to Spark 2.1 branch, I think we need to submit a separate PR for resolving the bug.

@gatorsmile
Copy link
Member

gatorsmile commented Dec 5, 2016

@mallman , based on my understanding, below is f4c48e1 might be what @cloud-fan wants.

@mallman
Copy link
Contributor Author

mallman commented Dec 5, 2016

@gatorsmile I can't find your commit:

[msa@ip-10-0-8-34 spark-master]$ git fetch origin
remote: Counting objects: 114, done.
remote: Compressing objects: 100% (53/53), done.
remote: Total 114 (delta 41), reused 22 (delta 22), pack-reused 15
Receiving objects: 100% (114/114), 51.13 KiB | 0 bytes/s, done.
Resolving deltas: 100% (41/41), completed with 22 local objects.
From github.com:apache/spark
   1821cbe..e23c8cf  branch-2.1 -> origin/branch-2.1
   eb8dd68..01a7d33  master     -> origin/master
[msa@ip-10-0-8-34 spark-master]$ git show f4c48e1d61dc4d1fc581b78dd9e5c948965acad5
fatal: bad object f4c48e1d61dc4d1fc581b78dd9e5c948965acad5

@mallman
Copy link
Contributor Author

mallman commented Dec 5, 2016

#15998 (comment) found a bug. If this PR will not be merged to Spark 2.1 branch, I think we need to submit a separate PR for resolving the bug.

I would like to get this patch into Spark 2.1 as it's a scalability issue for partitioned tables.

@mallman mallman force-pushed the spark-18572-list_partition_names branch from 095cdd5 to 37fc595 Compare December 5, 2016 20:15
@mallman
Copy link
Contributor Author

mallman commented Dec 5, 2016

@gatorsmile I've applied your patch and reverted the change I made in the previous commit to workaround that defect. The failed test now passes for me. Let's see what Jenkins says.

@SparkQA
Copy link

SparkQA commented Dec 5, 2016

Test build #69683 has finished for PR 15998 at commit 37fc595.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mallman
Copy link
Contributor Author

mallman commented Dec 5, 2016

I suspect this is a spurious, unrelated test failure. Can we get a rebuild, please?

@gatorsmile
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 6, 2016

Test build #69691 has finished for PR 15998 at commit 37fc595.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM, @rxin shall we backport this? Although it's not a bug, it's a scalability issue.

@rxin
Copy link
Contributor

rxin commented Dec 6, 2016

Yea this one seems like directly related to the work we did in 2.1 for large table handling.

catalog.createPartitions("db2", "tbl2", Seq(newPart), ignoreIfExists = false)

val partitionNames1 = catalog.listPartitionNames("db2", "tbl2", Some(Map("a" -> "1")))
assert(partitionNames1 == Seq("a=1/b=%25%3D", "a=1/b=2"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @gatorsmile @ericl is it same with Hive? It looks weird to me that we return the "weird" value to users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I tried Hive 1.2. It actually returns the weird value.

hive> create table partTab (col1 int, col2 int) partitioned by (pcol1 String, pcol2 String);
OK
hive> insert into table partTab partition(pcol1='1', pcol2='2') select 3, 4 from dummy;
OK
hive> insert into table partTab partition(pcol1='1', pcol2='%=') select 3, 4 from dummy;
OK
hive> show partitions partTab;
OK
pcol1=1/pcol2=%25%3D
pcol1=1/pcol2=2
hive> show partitions partTab PARTITION(pcol1=1);
OK
pcol1=1/pcol2=2
pcol1=1/pcol2=%25%3D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, maybe we should consider diverging from Hive here...

@cloud-fan
Copy link
Contributor

thanks, merging to master/2.1!

asfgit pushed a commit that referenced this pull request Dec 6, 2016
…log`

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572)

## What changes were proposed in this pull request?

Currently Spark answers the `SHOW PARTITIONS` command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a `getPartitionNames` method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table.

To test the performance impact of this PR, I ran the `SHOW PARTITIONS` command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table `table1` and the latter table `table2`. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows:

Spark at bdc8153, `SHOW PARTITIONS table1`, times in seconds:
7.901
3.983
4.018
4.331
4.261

Spark at bdc8153, `SHOW PARTITIONS table2`
(Timed out after 10 minutes with a `SocketTimeoutException`.)

Spark at this PR, `SHOW PARTITIONS table1`, times in seconds:
3.801
0.449
0.395
0.348
0.336

Spark at this PR, `SHOW PARTITIONS table2`, times in seconds:
5.184
1.63
1.474
1.519
1.41

Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master.

This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x.

## How was this patch tested?

I added a unit test to `VersionsSuite` which tests that the Hive client's `getPartitionNames` method returns the correct number of partitions.

Author: Michael Allman <[email protected]>

Closes #15998 from mallman/spark-18572-list_partition_names.

(cherry picked from commit 772ddbe)
Signed-off-by: Wenchen Fan <[email protected]>
@asfgit asfgit closed this in 772ddbe Dec 6, 2016
@mallman mallman deleted the spark-18572-list_partition_names branch December 6, 2016 17:45
robert3005 pushed a commit to palantir/spark that referenced this pull request Dec 15, 2016
…log`

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572)

## What changes were proposed in this pull request?

Currently Spark answers the `SHOW PARTITIONS` command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a `getPartitionNames` method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table.

To test the performance impact of this PR, I ran the `SHOW PARTITIONS` command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table `table1` and the latter table `table2`. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows:

Spark at bdc8153, `SHOW PARTITIONS table1`, times in seconds:
7.901
3.983
4.018
4.331
4.261

Spark at bdc8153, `SHOW PARTITIONS table2`
(Timed out after 10 minutes with a `SocketTimeoutException`.)

Spark at this PR, `SHOW PARTITIONS table1`, times in seconds:
3.801
0.449
0.395
0.348
0.336

Spark at this PR, `SHOW PARTITIONS table2`, times in seconds:
5.184
1.63
1.474
1.519
1.41

Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master.

This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x.

## How was this patch tested?

I added a unit test to `VersionsSuite` which tests that the Hive client's `getPartitionNames` method returns the correct number of partitions.

Author: Michael Allman <[email protected]>

Closes apache#15998 from mallman/spark-18572-list_partition_names.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…log`

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-18572)

## What changes were proposed in this pull request?

Currently Spark answers the `SHOW PARTITIONS` command by fetching all of the table's partition metadata from the external catalog and constructing partition names therefrom. The Hive client has a `getPartitionNames` method which is many times faster for this purpose, with the performance improvement scaling with the number of partitions in a table.

To test the performance impact of this PR, I ran the `SHOW PARTITIONS` command on two Hive tables with large numbers of partitions. One table has ~17,800 partitions, and the other has ~95,000 partitions. For the purposes of this PR, I'll call the former table `table1` and the latter table `table2`. I ran 5 trials for each table with before-and-after versions of this PR. The results are as follows:

Spark at bdc8153, `SHOW PARTITIONS table1`, times in seconds:
7.901
3.983
4.018
4.331
4.261

Spark at bdc8153, `SHOW PARTITIONS table2`
(Timed out after 10 minutes with a `SocketTimeoutException`.)

Spark at this PR, `SHOW PARTITIONS table1`, times in seconds:
3.801
0.449
0.395
0.348
0.336

Spark at this PR, `SHOW PARTITIONS table2`, times in seconds:
5.184
1.63
1.474
1.519
1.41

Taking the best times from each trial, we get a 12x performance improvement for a table with ~17,800 partitions and at least a 426x improvement for a table with ~95,000 partitions. More significantly, the latter command doesn't even complete with the current code in master.

This is actually a patch we've been using in-house at VideoAmp since Spark 1.1. It's made all the difference in the practical usability of our largest tables. Even with tables with about 1,000 partitions there's a performance improvement of about 2-3x.

## How was this patch tested?

I added a unit test to `VersionsSuite` which tests that the Hive client's `getPartitionNames` method returns the correct number of partitions.

Author: Michael Allman <[email protected]>

Closes apache#15998 from mallman/spark-18572-list_partition_names.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants