Skip to content

Commit

Permalink
resolve review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed May 14, 2015
1 parent dc1bfa1 commit 8b885d6
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 237 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.spark.sql.hive.orc

import org.apache.hadoop.hive.common.`type`.HiveVarchar
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}

import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.spark.sql.catalyst.expressions.{Row, MutableRow}

import scala.collection.JavaConversions._
import org.apache.spark.sql.catalyst.expressions.MutableRow
import org.apache.spark.sql.hive.{HiveInspectors, HiveShim}

/**
* We can consolidate TableReader.unwrappers and HiveInspectors.wrapperFor to use
Expand Down Expand Up @@ -59,35 +58,5 @@ private[hive] object HadoopTypeConverter extends HiveInspectors {
/**
* Wraps with Hive types based on object inspector.
*/
def wrappers(oi: ObjectInspector): Any => Any = oi match {
case _: JavaHiveVarcharObjectInspector =>
(o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size)

case _: JavaHiveDecimalObjectInspector =>
(o: Any) => HiveShim.createDecimal(o.asInstanceOf[BigDecimal].underlying())

case soi: StandardStructObjectInspector =>
val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector))
(o: Any) => {
val struct = soi.create()
(soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach {
(field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data))
}
struct
}

case loi: ListObjectInspector =>
val wrapper = wrapperFor(loi.getListElementObjectInspector)
(o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper))

case moi: MapObjectInspector =>
val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector)
val valueWrapper = wrapperFor(moi.getMapValueObjectInspector)
(o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) =>
keyWrapper(key) -> valueWrapper(value)
})

case _ =>
identity[Any]
}
def wrappers(oi: ObjectInspector): Any => Any = wrapperFor(oi)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.sql.hive.orc

import java.io.IOException

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader}
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.HiveMetastoreTypes
Expand All @@ -31,7 +30,7 @@ import org.apache.spark.sql.types.StructType
private[orc] object OrcFileOperator extends Logging{

def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = {
var conf = config.getOrElse(new Configuration)
val conf = config.getOrElse(new Configuration)
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
val orcFiles = listOrcFiles(pathStr, conf)
Expand All @@ -53,19 +52,6 @@ private[orc] object OrcFileOperator extends Logging{
readerInspector
}

def deletePath(pathStr: String, conf: Configuration): Unit = {
val fspath = new Path(pathStr)
val fs = fspath.getFileSystem(conf)
try {
fs.delete(fspath, true)
} catch {
case e: IOException =>
throw new IOException(
s"Unable to clear output directory ${fspath.toString} prior"
+ s" to InsertIntoOrcTable:\n${e.toString}")
}
}

def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = {
val origPath = new Path(pathStr)
val fs = origPath.getFileSystem(conf)
Expand All @@ -80,8 +66,6 @@ private[orc] object OrcFileOperator extends Logging{
throw new IllegalArgumentException(
s"orcFileOperator: path $path does not have valid orc files matching the pattern")
}
logInfo("Qualified file list: ")
paths.foreach{x=>logInfo(x.toString)}
paths
}
}
133 changes: 44 additions & 89 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,100 +22,55 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder
import org.apache.spark.Logging
import org.apache.spark.sql.sources._

private[sql] object OrcFilters extends Logging {

/**
* It may be optimized by push down partial filters. But we are conservative here.
* Because if some filters fail to be parsed, the tree may be corrupted,
* and cannot be used anymore.
*/
private[orc] object OrcFilters extends Logging {
def createFilter(expr: Array[Filter]): Option[SearchArgument] = {
if (expr == null || expr.size == 0) return None
var sarg: Option[Builder] = Some(SearchArgument.FACTORY.newBuilder())
sarg.get.startAnd()
expr.foreach {
x => {
sarg match {
case Some(s1) => sarg = createFilter(x, s1)
case _ => None
}
}
}
sarg match {
case Some(b) => Some(b.end.build)
case _ => None
if (expr.nonEmpty) {
expr.foldLeft(Some(SearchArgument.FACTORY.newBuilder().startAnd()): Option[Builder]) {
(maybeBuilder, e) => createFilter(e, maybeBuilder)
}.map(_.end().build())
} else {
None
}
}

def createFilter(expression: Filter, builder: Builder): Option[Builder] = {
expression match {
case p@And(left: Filter, right: Filter) => {
val b1 = builder.startAnd()
val b2 = createFilter(left, b1)
b2 match {
case Some(b) => val b3 = createFilter(right, b)
if (b3.isDefined) {
Some(b3.get.end)
} else {
None
}
case _ => None
}
}
case p@Or(left: Filter, right: Filter) => {
val b1 = builder.startOr()
val b2 = createFilter(left, b1)
b2 match {
case Some(b) => val b3 = createFilter(right, b)
if (b3.isDefined) {
Some(b3.get.end)
} else {
None
}
case _ => None
}
}
case p@Not(child: Filter) => {
val b1 = builder.startNot()
val b2 = createFilter(child, b1)
b2 match {
case Some(b) => Some(b.end)
case _ => None
}
}
case p@EqualTo(attribute: String, value: Any) => {
val b1 = builder.equals(attribute, value)
Some(b1)
}
case p@LessThan(attribute: String, value: Any) => {
val b1 = builder.lessThan(attribute ,value)
Some(b1)
}
case p@LessThanOrEqual(attribute: String, value: Any) => {
val b1 = builder.lessThanEquals(attribute, value)
Some(b1)
}
case p@GreaterThan(attribute: String, value: Any) => {
val b1 = builder.startNot().lessThanEquals(attribute, value).end()
Some(b1)
}
case p@GreaterThanOrEqual(attribute: String, value: Any) => {
val b1 = builder.startNot().lessThan(attribute, value).end()
Some(b1)
}
case p@IsNull(attribute: String) => {
val b1 = builder.isNull(attribute)
Some(b1)
}
case p@IsNotNull(attribute: String) => {
val b1 = builder.startNot().isNull(attribute).end()
Some(b1)
}
case p@In(attribute: String, values: Array[Any]) => {
val b1 = builder.in(attribute, values)
Some(b1)
private def createFilter(expression: Filter, maybeBuilder: Option[Builder]): Option[Builder] = {
maybeBuilder.flatMap { builder =>
expression match {
case p@And(left, right) =>
for {
lhs <- createFilter(left, Some(builder.startAnd()))
rhs <- createFilter(right, Some(lhs))
} yield rhs.end()
case p@Or(left, right) =>
for {
lhs <- createFilter(left, Some(builder.startOr()))
rhs <- createFilter(right, Some(lhs))
} yield rhs.end()
case p@Not(child) =>
createFilter(child, Some(builder.startNot())).map(_.end())
case p@EqualTo(attribute, value) =>
Some(builder.equals(attribute, value))
case p@LessThan(attribute, value) =>
Some(builder.lessThan(attribute, value))
case p@LessThanOrEqual(attribute, value) =>
Some(builder.lessThanEquals(attribute, value))
case p@GreaterThan(attribute, value) =>
Some(builder.startNot().lessThanEquals(attribute, value).end())
case p@GreaterThanOrEqual(attribute, value) =>
Some(builder.startNot().lessThan(attribute, value).end())
case p@IsNull(attribute) =>
Some(builder.isNull(attribute))
case p@IsNotNull(attribute) =>
Some(builder.startNot().isNull(attribute).end())
case p@In(attribute, values) =>
Some(builder.in(attribute, values))
case _ => None
}
// not supported in filter
// case p@EqualNullSafe(left: String, right: String) => {
// val b1 = builder.nullSafeEquals(left, right)
// Some(b1)
// }
case _ => None
}
}
}
Loading

0 comments on commit 8b885d6

Please sign in to comment.