From 1de83a7560f85cd347bca6dde256d551da63a144 Mon Sep 17 00:00:00 2001 From: chutium Date: Wed, 16 Jul 2014 13:44:09 +0200 Subject: [PATCH 1/5] SPARK-2407: Added Parse of SQL SUBSTR() --- .../scala/org/apache/spark/sql/catalyst/SqlParser.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index e5653c5b14ac1..785cb1e84b587 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -120,7 +120,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val WHERE = Keyword("WHERE") protected val INTERSECT = Keyword("INTERSECT") protected val EXCEPT = Keyword("EXCEPT") - + protected val SUBSTR = Keyword("SUBSTR") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -316,6 +316,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers { IF ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ { case c ~ "," ~ t ~ "," ~ f => If(c,t,f) } | + SUBSTR ~> "(" ~> expression ~ "," ~ expression <~ ")" ^^ { + case s ~ "," ~ p => Substring(s,p,Literal(Integer.MAX_VALUE)) + } | + SUBSTR ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ { + case s ~ "," ~ p ~ "," ~ l => Substring(s,p,l) + } | ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ { case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs) } From 9a60ccf4938dc921e143c27276c19bda59180e4b Mon Sep 17 00:00:00 2001 From: chutium Date: Fri, 18 Jul 2014 01:24:16 +0200 Subject: [PATCH 2/5] SPARK-2407: Added Parser of SQL SUBSTR() #1442 --- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 0743cfe8cff0f..bcd97bf5fcbb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -36,6 +36,15 @@ class SQLQuerySuite extends QueryTest { "test") } + test("SPARK-2407 Added Parser of SQL SUBSTR()") { + checkAnswer( + sql("SELECT substr(tableName, 1, 2) FROM tableName"), + "te") + checkAnswer( + sql("SELECT substr(tableName, 3) FROM tableName"), + "st") + } + test("index into array") { checkAnswer( sql("SELECT data, data[0], data[0] + data[1], data[0 + 1] FROM arrayData"), From b49cc8a5bb73ff25c289a38cbaedfaf7edfefc5b Mon Sep 17 00:00:00 2001 From: chutium Date: Fri, 18 Jul 2014 10:33:25 +0200 Subject: [PATCH 3/5] SPARK-2407: Added Parser of SQL SUBSTRING() #1442 --- .../scala/org/apache/spark/sql/catalyst/SqlParser.scala | 5 +++-- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 785cb1e84b587..a34b236c8ac6a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -121,6 +121,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val INTERSECT = Keyword("INTERSECT") protected val EXCEPT = Keyword("EXCEPT") protected val SUBSTR = Keyword("SUBSTR") + protected val SUBSTRING = Keyword("SUBSTRING") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -316,10 +317,10 @@ class SqlParser extends StandardTokenParsers with PackratParsers { IF ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ { case c ~ "," ~ t ~ "," ~ f => If(c,t,f) } | - SUBSTR ~> "(" ~> expression ~ "," ~ expression <~ ")" ^^ { + (SUBSTR | SUBSTRING) ~> "(" ~> expression ~ "," ~ expression <~ ")" ^^ { case s ~ "," ~ p => Substring(s,p,Literal(Integer.MAX_VALUE)) } | - SUBSTR ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ { + (SUBSTR | SUBSTRING) ~> "(" ~> expression ~ "," ~ expression ~ "," ~ expression <~ ")" ^^ { case s ~ "," ~ p ~ "," ~ l => Substring(s,p,l) } | ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bcd97bf5fcbb1..6736189c96d4b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -43,6 +43,12 @@ class SQLQuerySuite extends QueryTest { checkAnswer( sql("SELECT substr(tableName, 3) FROM tableName"), "st") + checkAnswer( + sql("SELECT substring(tableName, 1, 2) FROM tableName"), + "te") + checkAnswer( + sql("SELECT substring(tableName, 3) FROM tableName"), + "st") } test("index into array") { From 533332905c68b9fe8f7a9b9175a80f18710cc6fb Mon Sep 17 00:00:00 2001 From: chutium Date: Sat, 26 Jul 2014 04:53:44 +0200 Subject: [PATCH 4/5] SPARK-2700 Hidden files (such as .impala_insert_staging) should be filtered out by sqlContext.parquetFile --- .../scala/org/apache/spark/sql/parquet/ParquetTypes.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 58370b955a5ec..0e26ea498efd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -369,9 +369,10 @@ private[parquet] object ParquetTypesConverter extends Logging { } ParquetRelation.enableLogForwarding() - val children = fs.listStatus(path).filterNot { - _.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME - } + val children = fs.listStatus(path).filterNot ( + status => (status.getPath.getName.charAt(0) == '.' || + status.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME) + ) // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row // groups. Since Parquet schema is replicated among all row groups, we only need to touch a From 3b85e2fa098c22050bc3a2ebe5fec7a1f6fe25e1 Mon Sep 17 00:00:00 2001 From: chutium Date: Sat, 26 Jul 2014 05:50:59 +0200 Subject: [PATCH 5/5] SPARK-2699 Improve compatibility with parquet file/table --- .../apache/spark/sql/parquet/ParquetTypes.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 0e26ea498efd3..c17cf3890d86e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -43,10 +43,12 @@ private[parquet] object ParquetTypesConverter extends Logging { def isPrimitiveType(ctype: DataType): Boolean = classOf[PrimitiveType] isAssignableFrom ctype.getClass + private var allowBinaryType: Boolean = true + def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType = parquetType.getPrimitiveTypeName match { case ParquetPrimitiveTypeName.BINARY - if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType + if (parquetType.getOriginalType == ParquetOriginalType.UTF8 || !allowBinaryType) => StringType case ParquetPrimitiveTypeName.BINARY => BinaryType case ParquetPrimitiveTypeName.BOOLEAN => BooleanType case ParquetPrimitiveTypeName.DOUBLE => DoubleType @@ -369,7 +371,9 @@ private[parquet] object ParquetTypesConverter extends Logging { } ParquetRelation.enableLogForwarding() - val children = fs.listStatus(path).filterNot ( + allowBinaryType = conf.getBoolean("parquet.binarytype", true) + + val children = fs.listStatus(path).filterNot( status => (status.getPath.getName.charAt(0) == '.' || status.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME) ) @@ -407,8 +411,12 @@ private[parquet] object ParquetTypesConverter extends Logging { if (keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY) != null) { convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY)) } else { - val attributes = convertToAttributes( - readMetaData(origPath, conf).getFileMetaData.getSchema) + val fileMetaData = readMetaData(origPath, conf).getFileMetaData + if (fileMetaData.getCreatedBy.contains("impala")) { + allowBinaryType = false + log.info(s"Impala parquet file founded, BinaryType disabled") + } + val attributes = convertToAttributes(fileMetaData.getSchema) log.info(s"Falling back to schema conversion from Parquet types; result: $attributes") attributes }