Skip to content

Commit

Permalink
[SNAP-1194] explicit addLong/longValue methods in SQLMetrics (#33)
Browse files Browse the repository at this point in the history
This avoids runtime erasure for add/value methods that will result in unnecessary boxing/unboxing overheads.

- Adding spark-kafka-sql project
- Update version of deps as per upstream.
- corrected kafka-clients reference
  • Loading branch information
Sumedh Wale authored Dec 3, 2016
1 parent 58cdae0 commit c64b6ca
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 6 deletions.
1 change: 1 addition & 0 deletions assembly/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
compile project(subprojectBase + 'snappy-spark-streaming_' + scalaBinaryVersion)
compile project(subprojectBase + 'snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion)
compile project(subprojectBase + 'snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion)
compile project(subprojectBase + 'snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion)
compile project(subprojectBase + 'snappy-spark-yarn_' + scalaBinaryVersion)
compile project(subprojectBase + 'snappy-spark-mllib_' + scalaBinaryVersion)
compile project(subprojectBase + 'snappy-spark-graphx_' + scalaBinaryVersion)
Expand Down
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ dependencies {
}
compile group: 'org.apache.ivy', name: 'ivy', version: '2.4.0'
compile group: 'oro', name: 'oro', version: '2.0.8'
compile(group: 'net.razorvine', name: 'pyrolite', version: '4.9') {
compile(group: 'net.razorvine', name: 'pyrolite', version: '4.13') {
exclude(group: 'net.razorvine', module: 'serpent')
}
compile group: 'net.sf.py4j', name: 'py4j', version: '0.10.1'
Expand Down
33 changes: 33 additions & 0 deletions external/kafka-0-10-sql/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2016 SnappyData, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License. See accompanying
* LICENSE file.
*/

description = 'Kafka 0.10 Source for Structured Streaming'

dependencies {
compile project(subprojectBase + 'snappy-spark-tags_' + scalaBinaryVersion)
provided project(subprojectBase + 'snappy-spark-core_' + scalaBinaryVersion)
provided project(subprojectBase + 'snappy-spark-catalyst_' + scalaBinaryVersion)
provided project(subprojectBase + 'snappy-spark-sql_' + scalaBinaryVersion)

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.10.0.1'

testCompile project(path: subprojectBase + 'snappy-spark-core_' + scalaBinaryVersion, configuration: 'testOutput')
testCompile project(path: subprojectBase + 'snappy-spark-catalyst_' + scalaBinaryVersion, configuration: 'testOutput')
testCompile project(path: subprojectBase + 'snappy-spark-sql_' + scalaBinaryVersion, configuration: 'testOutput')
testCompile group: 'org.apache.kafka', name: 'kafka_' + scalaBinaryVersion, version: '0.10.0.1'
testCompile group: 'net.sf.jopt-simple', name: 'jopt-simple', version: '3.2'
}
3 changes: 2 additions & 1 deletion external/kafka-0-10/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ description = 'Spark Integration for Kafka 0.10'

dependencies {
compile project(subprojectBase + 'snappy-spark-tags_' + scalaBinaryVersion)
compile project(subprojectBase + 'snappy-spark-streaming_' + scalaBinaryVersion)
provided project(subprojectBase + 'snappy-spark-core_' + scalaBinaryVersion)
provided project(subprojectBase + 'snappy-spark-streaming_' + scalaBinaryVersion)

compile(group: 'org.apache.kafka', name: 'kafka_' + scalaBinaryVersion, version: '0.10.0.1') {
exclude(group: 'com.sun.jmx', module: 'jmxri')
Expand Down
2 changes: 2 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ include ':snappy-spark-streaming-flume_' + scalaBinaryVersion
include ':snappy-spark-streaming-flume-sink_' + scalaBinaryVersion
include ':snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion
include ':snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion
include ':snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion
include ':snappy-spark-examples_' + scalaBinaryVersion
include ':snappy-spark-repl_' + scalaBinaryVersion
include ':snappy-spark-launcher_' + scalaBinaryVersion
Expand Down Expand Up @@ -67,6 +68,7 @@ project(':snappy-spark-streaming-flume_' + scalaBinaryVersion).projectDir = "$ro
project(':snappy-spark-streaming-flume-sink_' + scalaBinaryVersion).projectDir = "$rootDir/external/flume-sink" as File
project(':snappy-spark-streaming-kafka-0.8_' + scalaBinaryVersion).projectDir = "$rootDir/external/kafka-0-8" as File
project(':snappy-spark-streaming-kafka-0.10_' + scalaBinaryVersion).projectDir = "$rootDir/external/kafka-0-10" as File
project(':snappy-spark-sql-kafka-0.10_' + scalaBinaryVersion).projectDir = "$rootDir/external/kafka-0-10-sql" as File
project(':snappy-spark-examples_' + scalaBinaryVersion).projectDir = "$rootDir/examples" as File
project(':snappy-spark-repl_' + scalaBinaryVersion).projectDir = "$rootDir/repl" as File
project(':snappy-spark-launcher_' + scalaBinaryVersion).projectDir = "$rootDir/launcher" as File
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ case class FilterExec(condition: Expression, child: SparkPlan)
s"""
|$generated
|$nullChecks
|$numOutput.add(1);
|$numOutput.addLong(1);
|${consume(ctx, resultVars)}
""".stripMargin
}
Expand Down Expand Up @@ -289,7 +289,7 @@ case class SampleExec(
s"""
| int $samplingCount = $sampler.sample();
| while ($samplingCount-- > 0) {
| $numOutput.add(1);
| $numOutput.addLong(1);
| ${consume(ctx, input)}
| }
""".stripMargin.trim
Expand All @@ -303,7 +303,7 @@ case class SampleExec(

s"""
| if ($sampler.sample() == 0) continue;
| $numOutput.add(1);
| $numOutput.addLong(1);
| ${consume(ctx, input)}
""".stripMargin.trim
}
Expand Down Expand Up @@ -384,7 +384,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| $partitionEnd = end.longValue();
| }
|
| $numOutput.add(($partitionEnd - $number) / ${step}L);
| $numOutput.addLong(($partitionEnd - $number) / ${step}L);
| }
""".stripMargin)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ final class SQLMetric(var metricType: String, initValue: Long = 0L)

override def add(v: Long): Unit = _value += v

// avoid the runtime generic Object conversion of add(), value()
final def addLong(v: Long): Unit = _value += v

final def longValue: Long = _value

def +=(v: Long): Unit = _value += v

override def value: Long = _value
Expand Down

0 comments on commit c64b6ca

Please sign in to comment.