diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java new file mode 100644 index 0000000000000..dd92addf5d1d2 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql; + +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.VoidFunction; + +import org.apache.spark.sql.api.java.JavaSQLContext; +import org.apache.spark.sql.api.java.JavaSchemaRDD; +import org.apache.spark.sql.api.java.JavaRow; + +public final class JavaSparkSQL { + public static void main(String[] args) throws Exception { + JavaSparkContext ctx = new JavaSparkContext("local", "JavaSparkSQL", + System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkSQL.class)); + JavaSQLContext sqlCtx = new JavaSQLContext(ctx); + + JavaSchemaRDD parquetFile = sqlCtx.parquetFile("pair.parquet"); + parquetFile.registerAsTable("parquet"); + + JavaSchemaRDD queryResult = sqlCtx.sql("SELECT * FROM parquet"); + queryResult.foreach(new VoidFunction() { + @Override + public void call(JavaRow row) throws Exception { + System.out.println(row.get(0) + " " + row.get(1)); + } + }); + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 770cabcb31d13..a62cb8aa1321f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql +import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType -import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} /** * ALPHA COMPONENT @@ -92,23 +92,10 @@ import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} */ class SchemaRDD( @transient val sqlContext: SQLContext, - @transient val logicalPlan: LogicalPlan) - extends RDD[Row](sqlContext.sparkContext, Nil) { + @transient protected[spark] val logicalPlan: LogicalPlan) + extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike { - /** - * A lazily computed query execution workflow. All other RDD operations are passed - * through to the RDD that is produced by this workflow. - * - * We want this to be lazy because invoking the whole query optimization pipeline can be - * expensive. - */ - @transient - protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan) - - override def toString = - s"""${super.toString} - |== Query Plan == - |${queryExecution.executedPlan}""".stripMargin.trim + def baseSchemaRDD = this // ========================================================================================= // RDD functions: Copy the interal row representation so we present immutable data to users. @@ -312,31 +299,12 @@ class SchemaRDD( sqlContext, InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)) - /** - * Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that - * are written out using this method can be read back in as a SchemaRDD using the ``function - * - * @group schema - */ - def saveAsParquetFile(path: String): Unit = { - sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd - } - - /** - * Registers this RDD as a temporary table using the given name. The lifetime of this temporary - * table is tied to the [[SQLContext]] that was used to create this SchemaRDD. - * - * @group schema - */ - def registerAsTable(tableName: String): Unit = { - sqlContext.registerRDDAsTable(this, tableName) - } - /** * Returns this RDD as a SchemaRDD. * @group schema */ def toSchemaRDD = this + /** FOR INTERNAL USE ONLY */ def analyze = sqlContext.analyzer(logicalPlan) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala new file mode 100644 index 0000000000000..840803a52c1cf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala @@ -0,0 +1,66 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.plans.logical._ + +/** + * Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java) + */ +trait SchemaRDDLike { + @transient val sqlContext: SQLContext + @transient protected[spark] val logicalPlan: LogicalPlan + + private[sql] def baseSchemaRDD: SchemaRDD + + /** + * A lazily computed query execution workflow. All other RDD operations are passed + * through to the RDD that is produced by this workflow. + * + * We want this to be lazy because invoking the whole query optimization pipeline can be + * expensive. + */ + @transient + protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan) + + override def toString = + s"""${super.toString} + |== Query Plan == + |${queryExecution.executedPlan}""".stripMargin.trim + + + /** + * Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that + * are written out using this method can be read back in as a SchemaRDD using the ``function + * + * @group schema + */ + def saveAsParquetFile(path: String): Unit = { + sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd + } + + /** + * Registers this RDD as a temporary table using the given name. The lifetime of this temporary + * table is tied to the [[SQLContext]] that was used to create this SchemaRDD. + * + * @group schema + */ + def registerAsTable(tableName: String): Unit = { + sqlContext.registerRDDAsTable(baseSchemaRDD, tableName) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala new file mode 100644 index 0000000000000..303dca11dca1a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -0,0 +1,51 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.api.java + +import org.apache.spark.api.java.JavaSparkContext + +import org.apache.spark.sql._ + +class JavaSQLContext(sparkContext: JavaSparkContext) { + + val sqlContext = new SQLContext(sparkContext) + + def sql(sqlQuery: String): JavaSchemaRDD = { + val result = new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery)) + // We force query optimization to happen right away instead of letting it happen lazily like + // when using the query DSL. This is so DDL commands behave as expected. This is only + // generates the RDD lineage for DML queries, but do not perform any execution. + result.queryExecution.toRdd + result + } + + /** + * Loads a parquet file, returning the result as a [[SchemaRDD]]. + */ + def parquetFile(path: String): JavaSchemaRDD = + new JavaSchemaRDD(sqlContext, parquet.ParquetRelation("ParquetFile", path)) + + + /** + * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only + * during the lifetime of this instance of SQLContext. + */ + def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = { + sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala new file mode 100644 index 0000000000000..38f92e1689a97 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.api.java + +import org.apache.spark.api.java.{JavaRDDLike, JavaRDD} +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.rdd.RDD + +class JavaSchemaRDD( + @transient val sqlContext: SQLContext, + @transient protected[spark] val logicalPlan: LogicalPlan) + extends JavaRDDLike[JavaRow, JavaRDD[JavaRow]] + with SchemaRDDLike { + + private[sql] val baseSchemaRDD = new SchemaRDD(sqlContext, logicalPlan) + + override val classTag = scala.reflect.classTag[JavaRow] + + override def wrapRDD(rdd: RDD[JavaRow]): JavaRDD[JavaRow] = JavaRDD.fromRDD(rdd) + + val rdd = baseSchemaRDD.map(new JavaRow(_)) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala new file mode 100644 index 0000000000000..bb7b0f122fbec --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.api.java + +import org.apache.spark.sql.catalyst.expressions.Row + +/** + * A result row from a SparkSQL query. + */ +class JavaRow(row: Row) { + + def length: Int = row.length + + def get(i: Int): Any = + row(i) + + def isNullAt(i: Int) = get(i) == null + + def getInt(i: Int): Int = + row.getInt(i) + + def getLong(i: Int): Long = + row.getLong(i) + + def getDouble(i: Int): Double = + row.getDouble(i) + + def getBoolean(i: Int): Boolean = + row.getBoolean(i) + + def getShort(i: Int): Short = + row.getShort(i) + + def getByte(i: Int): Byte = + row.getByte(i) + + def getFloat(i: Int): Float = + row.getFloat(i) + + def getString(i: Int): String = + row.getString(i) +} +