Skip to content

Commit

Permalink
[SEDONA-405] Replace Metric with LongAccumulator to reduce memory ove…
Browse files Browse the repository at this point in the history
…rhead for spatial join (#1041)
  • Loading branch information
jiayuasu authored Sep 29, 2023
1 parent 16c7972 commit db07953
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.sedona.core.enums.IndexType;
import org.apache.sedona.core.enums.JoinBuildSide;
import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.sedona.core.utils.TimeUtils;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.apache.spark.util.LongAccumulator;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.index.SpatialIndex;
Expand All @@ -50,10 +50,10 @@ public DynamicIndexLookupJudgement(
SpatialPredicate spatialPredicate,
IndexType indexType,
JoinBuildSide joinBuildSide,
Metric buildCount,
Metric streamCount,
Metric resultCount,
Metric candidateCount)
LongAccumulator buildCount,
LongAccumulator streamCount,
LongAccumulator resultCount,
LongAccumulator candidateCount)
{
super(spatialPredicate, buildCount, streamCount, resultCount, candidateCount);
this.indexType = indexType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.sedona.core.spatialOperator.SpatialPredicateEvaluators;
import org.apache.spark.TaskContext;
import org.apache.spark.util.LongAccumulator;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.index.SpatialIndex;

Expand All @@ -49,10 +49,10 @@ abstract class JudgementBase<T extends Geometry, U extends Geometry>

private final SpatialPredicate spatialPredicate;
private transient SpatialPredicateEvaluators.SpatialPredicateEvaluator evaluator;
protected final Metric buildCount;
protected final Metric streamCount;
protected final Metric resultCount;
protected final Metric candidateCount;
protected final LongAccumulator buildCount;
protected final LongAccumulator streamCount;
protected final LongAccumulator resultCount;
protected final LongAccumulator candidateCount;

private int shapeCnt;

Expand All @@ -69,7 +69,7 @@ abstract class JudgementBase<T extends Geometry, U extends Geometry>
* @param resultCount num of join results
* @param candidateCount num of candidate pairs to be refined by their real geometries
*/
protected JudgementBase(SpatialPredicate spatialPredicate, Metric buildCount, Metric streamCount, Metric resultCount, Metric candidateCount)
protected JudgementBase(SpatialPredicate spatialPredicate, LongAccumulator buildCount, LongAccumulator streamCount, LongAccumulator resultCount, LongAccumulator candidateCount)
{
this.spatialPredicate = spatialPredicate;
this.buildCount = buildCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.apache.sedona.core.joinJudgement;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.apache.spark.util.LongAccumulator;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.index.SpatialIndex;

Expand All @@ -39,10 +39,10 @@ public class LeftIndexLookupJudgement<T extends Geometry, U extends Geometry>
* @see JudgementBase
*/
public LeftIndexLookupJudgement(SpatialPredicate spatialPredicate,
Metric buildCount,
Metric streamCount,
Metric resultCount,
Metric candidateCount)
LongAccumulator buildCount,
LongAccumulator streamCount,
LongAccumulator resultCount,
LongAccumulator candidateCount)
{
super(spatialPredicate, buildCount, streamCount, resultCount, candidateCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.apache.sedona.core.joinJudgement;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.apache.spark.util.LongAccumulator;
import org.locationtech.jts.geom.Geometry;

import java.io.Serializable;
Expand All @@ -39,10 +39,10 @@ public class NestedLoopJudgement<T extends Geometry, U extends Geometry>
* @see JudgementBase
*/
public NestedLoopJudgement(SpatialPredicate spatialPredicate,
Metric buildCount,
Metric streamCount,
Metric resultCount,
Metric candidateCount)
LongAccumulator buildCount,
LongAccumulator streamCount,
LongAccumulator resultCount,
LongAccumulator candidateCount)
{
super(spatialPredicate, buildCount, streamCount, resultCount, candidateCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.apache.sedona.core.joinJudgement;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.spatialOperator.SpatialPredicate;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.apache.spark.util.LongAccumulator;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.index.SpatialIndex;

Expand All @@ -39,10 +39,10 @@ public class RightIndexLookupJudgement<T extends Geometry, U extends Geometry>
* @see JudgementBase
*/
public RightIndexLookupJudgement(SpatialPredicate spatialPredicate,
Metric buildCount,
Metric streamCount,
Metric resultCount,
Metric candidateCount)
LongAccumulator buildCount,
LongAccumulator streamCount,
LongAccumulator resultCount,
LongAccumulator candidateCount)
{
super(spatialPredicate, buildCount, streamCount, resultCount, candidateCount);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.sedona.core.enums.IndexType;
import org.apache.sedona.core.enums.JoinBuildSide;
import org.apache.sedona.core.joinJudgement.*;
import org.apache.sedona.core.monitoring.Metric;
import org.apache.sedona.core.monitoring.Metrics;
import org.apache.sedona.core.spatialPartitioning.SpatialPartitioner;
import org.apache.sedona.core.spatialRDD.CircleRDD;
Expand All @@ -38,6 +37,7 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.util.LongAccumulator;
import org.locationtech.jts.geom.Geometry;
import scala.Tuple2;

Expand Down Expand Up @@ -535,10 +535,10 @@ public static <U extends Geometry, T extends Geometry> JavaPairRDD<U, T> spatial
verifyPartitioningMatch(leftRDD, rightRDD);

SparkContext sparkContext = leftRDD.spatialPartitionedRDD.context();
Metric buildCount = Metrics.createMetric(sparkContext, "buildCount");
Metric streamCount = Metrics.createMetric(sparkContext, "streamCount");
Metric resultCount = Metrics.createMetric(sparkContext, "resultCount");
Metric candidateCount = Metrics.createMetric(sparkContext, "candidateCount");
LongAccumulator buildCount = Metrics.createMetric(sparkContext, "buildCount");
LongAccumulator streamCount = Metrics.createMetric(sparkContext, "streamCount");
LongAccumulator resultCount = Metrics.createMetric(sparkContext, "resultCount");
LongAccumulator candidateCount = Metrics.createMetric(sparkContext, "candidateCount");

final SpatialPartitioner partitioner =
(SpatialPartitioner) rightRDD.spatialPartitionedRDD.partitioner().get();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
package org.apache.sedona.core.monitoring

import org.apache.spark.SparkContext
import org.apache.spark.util.LongAccumulator

object Metrics {
def createMetric(sc: SparkContext, name: String): Metric = {
val acc = new Metric()
def createMetric(sc: SparkContext, name: String): LongAccumulator = {
val acc = new LongAccumulator()
sc.register(acc, "sedona.spatialjoin." + name)
acc
}
Expand Down

This file was deleted.

0 comments on commit db07953

Please sign in to comment.