Skip to content

Commit

Permalink
[SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR propose to improve concurrency performance for `FunctionRegistry`.

### Why are the changes needed?
Currently, `SimpleFunctionRegistryBase` adopted the `mutable.Map` caching function infos. The `SimpleFunctionRegistryBase`  guarded by this so as ensure security under multithreading.
Because all the mutable state are related to `functionBuilders`, we can delegate security to `ConcurrentHashMap`.
`ConcurrentHashMap ` has higher concurrency activity and responsiveness.
After this change, `FunctionRegistry` have better perf than before.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
GA.
The benchmark test.
```
object FunctionRegistryBenchmark extends BenchmarkBase {

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("FunctionRegistry") {
      val iters = 1000000
      val threadNum = 4
      val functionRegistry = FunctionRegistry.builtin
      val names = FunctionRegistry.expressions.keys.toSeq
      val barrier = new CyclicBarrier(threadNum + 1)
      val threadPool = ThreadUtils.newDaemonFixedThreadPool(threadNum, "test-function-registry")
      val benchmark = new Benchmark("SimpleFunctionRegistry", iters, output = output)

      benchmark.addCase("only read") { _ =>
        for (_ <- 1 to threadNum) {
          threadPool.execute(new Runnable {
            val random = new Random()
            override def run(): Unit = {
              barrier.await()
              for (_ <- 1 to iters) {
                val name = names(random.nextInt(names.size))
                val fun = functionRegistry.lookupFunction(new FunctionIdentifier(name))
                assert(fun.map(_.getName).get == name)
                functionRegistry.listFunction()
              }
              barrier.await()
            }
          })
        }
        barrier.await()
        barrier.await()
      }

      benchmark.run()
    }
  }
}
```
The benchmark output before this PR.
```
Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU  1.80GHz
SimpleFunctionRegistry:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
only read                                         54858          55043         261          0.0       54858.1       1.0X
```
The benchmark output after this PR.
```
Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU  1.80GHz
SimpleFunctionRegistry:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
only read                                         20202          20264          88          0.0       20202.1       1.0X
```

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #44976 from beliefer/SPARK-46937.

Authored-by: beliefer <[email protected]>
Signed-off-by: beliefer <[email protected]>
  • Loading branch information
beliefer committed Jun 4, 2024
1 parent abbe301 commit 8cebb9b
Showing 1 changed file with 26 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.spark.sql.catalyst.analysis

import java.util.Locale
import javax.annotation.concurrent.GuardedBy
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

import org.apache.spark.SparkUnsupportedOperationException
Expand Down Expand Up @@ -195,9 +195,8 @@ object FunctionRegistryBase {

trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging {

@GuardedBy("this")
protected val functionBuilders =
new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
new ConcurrentHashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]

// Resolution of the function name is always case insensitive, but the database name
// depends on the caller
Expand All @@ -220,45 +219,36 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging
def internalRegisterFunction(
name: FunctionIdentifier,
info: ExpressionInfo,
builder: FunctionBuilder): Unit = synchronized {
builder: FunctionBuilder): Unit = {
val newFunction = (info, builder)
functionBuilders.put(name, newFunction) match {
case Some(previousFunction) if previousFunction != newFunction =>
case previousFunction if previousFunction != newFunction =>
logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " +
log"previously registered function.")
case _ =>
}
}

override def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): T = {
val func = synchronized {
functionBuilders.get(normalizeFuncName(name)).map(_._2).getOrElse {
throw QueryCompilationErrors.unresolvedRoutineError(name, Seq("system.builtin"))
}
val func = Option(functionBuilders.get(normalizeFuncName(name))).map(_._2).getOrElse {
throw QueryCompilationErrors.unresolvedRoutineError(name, Seq("system.builtin"))
}
func(children)
}

override def listFunction(): Seq[FunctionIdentifier] = synchronized {
functionBuilders.iterator.map(_._1).toList
}
override def listFunction(): Seq[FunctionIdentifier] =
functionBuilders.keys().asScala.toSeq

override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = synchronized {
functionBuilders.get(normalizeFuncName(name)).map(_._1)
}
override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] =
Option(functionBuilders.get(normalizeFuncName(name))).map(_._1)

override def lookupFunctionBuilder(
name: FunctionIdentifier): Option[FunctionBuilder] = synchronized {
functionBuilders.get(normalizeFuncName(name)).map(_._2)
}
override def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] =
Option(functionBuilders.get(normalizeFuncName(name))).map(_._2)

override def dropFunction(name: FunctionIdentifier): Boolean = synchronized {
functionBuilders.remove(normalizeFuncName(name)).isDefined
}
override def dropFunction(name: FunctionIdentifier): Boolean =
Option(functionBuilders.remove(normalizeFuncName(name))).isDefined

override def clear(): Unit = synchronized {
functionBuilders.clear()
}
override def clear(): Unit = functionBuilders.clear()
}

/**
Expand Down Expand Up @@ -308,7 +298,11 @@ class SimpleFunctionRegistry

override def clone(): SimpleFunctionRegistry = synchronized {
val registry = new SimpleFunctionRegistry
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
val iterator = functionBuilders.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val name = entry.getKey
val (info, builder) = entry.getValue
registry.internalRegisterFunction(name, info, builder)
}
registry
Expand Down Expand Up @@ -1036,7 +1030,11 @@ class SimpleTableFunctionRegistry extends SimpleFunctionRegistryBase[LogicalPlan

override def clone(): SimpleTableFunctionRegistry = synchronized {
val registry = new SimpleTableFunctionRegistry
functionBuilders.iterator.foreach { case (name, (info, builder)) =>
val iterator = functionBuilders.entrySet().iterator()
while (iterator.hasNext) {
val entry = iterator.next()
val name = entry.getKey
val (info, builder) = entry.getValue
registry.internalRegisterFunction(name, info, builder)
}
registry
Expand Down

0 comments on commit 8cebb9b

Please sign in to comment.