diff --git a/assembly/build.gradle b/assembly/build.gradle index 63db32e3e41f6..0d81f789eb05b 100644 --- a/assembly/build.gradle +++ b/assembly/build.gradle @@ -27,6 +27,7 @@ dependencies { compile project(subprojectBase + 'snappy-spark-streaming_' + scalaBinaryVersion) compile project(subprojectBase + 'snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion) compile project(subprojectBase + 'snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion) + compile project(subprojectBase + 'snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion) compile project(subprojectBase + 'snappy-spark-yarn_' + scalaBinaryVersion) compile project(subprojectBase + 'snappy-spark-mllib_' + scalaBinaryVersion) compile project(subprojectBase + 'snappy-spark-graphx_' + scalaBinaryVersion) diff --git a/core/build.gradle b/core/build.gradle index 32a55761bbec3..ebeff567df64d 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -129,7 +129,7 @@ dependencies { } compile group: 'org.apache.ivy', name: 'ivy', version: '2.4.0' compile group: 'oro', name: 'oro', version: '2.0.8' - compile(group: 'net.razorvine', name: 'pyrolite', version: '4.9') { + compile(group: 'net.razorvine', name: 'pyrolite', version: '4.13') { exclude(group: 'net.razorvine', module: 'serpent') } compile group: 'net.sf.py4j', name: 'py4j', version: '0.10.1' diff --git a/external/kafka-0-10-sql/build.gradle b/external/kafka-0-10-sql/build.gradle new file mode 100644 index 0000000000000..45108783e735d --- /dev/null +++ b/external/kafka-0-10-sql/build.gradle @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 SnappyData, Inc. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +description = 'Kafka 0.10 Source for Structured Streaming' + +dependencies { + compile project(subprojectBase + 'snappy-spark-tags_' + scalaBinaryVersion) + provided project(subprojectBase + 'snappy-spark-core_' + scalaBinaryVersion) + provided project(subprojectBase + 'snappy-spark-catalyst_' + scalaBinaryVersion) + provided project(subprojectBase + 'snappy-spark-sql_' + scalaBinaryVersion) + + compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1' + + testCompile project(path: subprojectBase + 'snappy-spark-core_' + scalaBinaryVersion, configuration: 'testOutput') + testCompile project(path: subprojectBase + 'snappy-spark-catalyst_' + scalaBinaryVersion, configuration: 'testOutput') + testCompile project(path: subprojectBase + 'snappy-spark-sql_' + scalaBinaryVersion, configuration: 'testOutput') + testCompile group: 'org.apache.kafka', name: 'kafka_' + scalaBinaryVersion, version: '0.10.0.1' + testCompile group: 'net.sf.jopt-simple', name: 'jopt-simple', version: '3.2' +} diff --git a/external/kafka-0-10/build.gradle b/external/kafka-0-10/build.gradle index d5dc6611a97f0..9b6db7851f1d4 100644 --- a/external/kafka-0-10/build.gradle +++ b/external/kafka-0-10/build.gradle @@ -19,7 +19,8 @@ description = 'Spark Integration for Kafka 0.10' dependencies { compile project(subprojectBase + 'snappy-spark-tags_' + scalaBinaryVersion) - compile project(subprojectBase + 'snappy-spark-streaming_' + scalaBinaryVersion) + provided project(subprojectBase + 'snappy-spark-core_' + scalaBinaryVersion) + provided project(subprojectBase + 'snappy-spark-streaming_' + scalaBinaryVersion) compile(group: 'org.apache.kafka', name: 'kafka_' + scalaBinaryVersion, version: '0.10.0.1') { exclude(group: 'com.sun.jmx', module: 'jmxri') diff --git a/settings.gradle b/settings.gradle index ca33d18d94bf5..7150a225b3de3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -40,6 +40,7 @@ include ':snappy-spark-streaming-flume_' + scalaBinaryVersion include ':snappy-spark-streaming-flume-sink_' + scalaBinaryVersion include ':snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion include ':snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion +include ':snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion include ':snappy-spark-examples_' + scalaBinaryVersion include ':snappy-spark-repl_' + scalaBinaryVersion include ':snappy-spark-launcher_' + scalaBinaryVersion @@ -67,6 +68,7 @@ project(':snappy-spark-streaming-flume_' + scalaBinaryVersion).projectDir = "$ro project(':snappy-spark-streaming-flume-sink_' + scalaBinaryVersion).projectDir = "$rootDir/external/flume-sink" as File project(':snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion).projectDir = "$rootDir/external/kafka-0-8" as File project(':snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion).projectDir = "$rootDir/external/kafka-0-10" as File +project(':snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion).projectDir = "$rootDir/external/kafka-0-10-sql" as File project(':snappy-spark-examples_' + scalaBinaryVersion).projectDir = "$rootDir/examples" as File project(':snappy-spark-repl_' + scalaBinaryVersion).projectDir = "$rootDir/repl" as File project(':snappy-spark-launcher_' + scalaBinaryVersion).projectDir = "$rootDir/launcher" as File diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 15668c95a7b9a..d95734e06660a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -196,7 +196,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) s""" |$generated |$nullChecks - |$numOutput.add(1); + |$numOutput.addLong(1); |${consume(ctx, resultVars)} """.stripMargin } @@ -289,7 +289,7 @@ case class SampleExec( s""" | int $samplingCount = $sampler.sample(); | while ($samplingCount-- > 0) { - | $numOutput.add(1); + | $numOutput.addLong(1); | ${consume(ctx, input)} | } """.stripMargin.trim @@ -303,7 +303,7 @@ case class SampleExec( s""" | if ($sampler.sample() == 0) continue; - | $numOutput.add(1); + | $numOutput.addLong(1); | ${consume(ctx, input)} """.stripMargin.trim } @@ -384,7 +384,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | $partitionEnd = end.longValue(); | } | - | $numOutput.add(($partitionEnd - $number) / ${step}L); + | $numOutput.addLong(($partitionEnd - $number) / ${step}L); | } """.stripMargin) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 8baf0bc6a879c..2d4bf03fd24b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -55,6 +55,11 @@ final class SQLMetric(var metricType: String, initValue: Long = 0L) override def add(v: Long): Unit = _value += v + // avoid the runtime generic Object conversion of add(), value() + final def addLong(v: Long): Unit = _value += v + + final def longValue: Long = _value + def +=(v: Long): Unit = _value += v override def value: Long = _value