Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry #44976

Closed
wants to merge 1 commit into from

Conversation

beliefer
Copy link
Contributor

@beliefer beliefer commented Feb 1, 2024

What changes were proposed in this pull request?

This PR propose to improve concurrency performance for FunctionRegistry.

Why are the changes needed?

Currently, SimpleFunctionRegistryBase adopted the mutable.Map caching function infos. The SimpleFunctionRegistryBase guarded by this so as ensure security under multithreading.
Because all the mutable state are related to functionBuilders, we can delegate security to ConcurrentHashMap.
ConcurrentHashMap has higher concurrency activity and responsiveness.
After this change, FunctionRegistry have better perf than before.

Does this PR introduce any user-facing change?

'No'.

How was this patch tested?

GA.
The benchmark test.

object FunctionRegistryBenchmark extends BenchmarkBase {

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("FunctionRegistry") {
      val iters = 1000000
      val threadNum = 4
      val functionRegistry = FunctionRegistry.builtin
      val names = FunctionRegistry.expressions.keys.toSeq
      val barrier = new CyclicBarrier(threadNum + 1)
      val threadPool = ThreadUtils.newDaemonFixedThreadPool(threadNum, "test-function-registry")
      val benchmark = new Benchmark("SimpleFunctionRegistry", iters, output = output)

      benchmark.addCase("only read") { _ =>
        for (_ <- 1 to threadNum) {
          threadPool.execute(new Runnable {
            val random = new Random()
            override def run(): Unit = {
              barrier.await()
              for (_ <- 1 to iters) {
                val name = names(random.nextInt(names.size))
                val fun = functionRegistry.lookupFunction(new FunctionIdentifier(name))
                assert(fun.map(_.getName).get == name)
                functionRegistry.listFunction()
              }
              barrier.await()
            }
          })
        }
        barrier.await()
        barrier.await()
      }

      benchmark.run()
    }
  }
}

The benchmark output before this PR.

Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU @ 1.80GHz
SimpleFunctionRegistry:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
only read                                         54858          55043         261          0.0       54858.1       1.0X

The benchmark output after this PR.

Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU @ 1.80GHz
SimpleFunctionRegistry:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
only read                                         20202          20264          88          0.0       20202.1       1.0X

Was this patch authored or co-authored using generative AI tooling?

'No'.

@github-actions github-actions bot added the SQL label Feb 1, 2024
@beliefer beliefer changed the title [WIP][SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry [SPARK-46937][SQL] Improve concurrency performance for FunctionRegistry Feb 1, 2024
@beliefer
Copy link
Contributor Author

beliefer commented Feb 1, 2024

ping @cloud-fan cc @MaxGekk @viirya

@beliefer beliefer requested a review from MaxGekk February 1, 2024 13:03
@cloud-fan
Copy link
Contributor

What's the level of concurrency you expect for function registration/lookup? Do you have perf numbers?

@beliefer
Copy link
Contributor Author

beliefer commented Feb 3, 2024

What's the level of concurrency you expect for function registration/lookup? Do you have perf numbers?

The perf data has been added into PR description.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you rebase this to the master branch once more, @beliefer ?

This approach (and the benchmark result) looks reasonable to me for Apache Spark 4.0.0.

Do you still have some concern, @cloud-fan ?

@dongjoon-hyun
Copy link
Member

Could you resolve the conflicts, @beliefer ?

@dongjoon-hyun
Copy link
Member

I resolved the conflicts for you~

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (Pending CIs).

Also, cc @cloud-fan once more because he has a concern last time.

@beliefer
Copy link
Contributor Author

Let me rebase again.

@dongjoon-hyun
Copy link
Member

Thank you, @beliefer

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting, even with read-only cases, this change also helps. LGTM then.

@beliefer beliefer closed this in 8cebb9b Jun 4, 2024
@beliefer
Copy link
Contributor Author

beliefer commented Jun 4, 2024

Merged into master.
Thank you @dongjoon-hyun @cloud-fan @viirya

yaooqinn pushed a commit that referenced this pull request Jun 5, 2024
…cement

### What changes were proposed in this pull request?

A followup of #44976 . `ConcurrentHashMap#put` has a different semantic than the scala map, and it returns null if the key is new. We should update the checking code accordingly.

### Why are the changes needed?

avoid wrong warning messages

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

manual

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #46876 from cloud-fan/log.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
@@ -1032,7 +1026,11 @@ class SimpleTableFunctionRegistry extends SimpleFunctionRegistryBase[LogicalPlan

override def clone(): SimpleTableFunctionRegistry = synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a problem here. We don't synchronize the write path, how can we safely clone the ConcurrentHashMap?

Copy link
Contributor

@tedyu tedyu Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can introduce a ReentrantReadWriteLock guarding the ConcurrentHashMap.
clone and clear would take the write lock on the ConcurrentHashMap.
The other methods take read lock on the map.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may work, but makes the code complicated. We should only do it if this does make a difference to real-world workloads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, It's safe here.
Because the entrySet() of ConcurrentHashMap is thread safe. We don't need synchronized.

@cloud-fan
Copy link
Contributor

Shall we revert this if #44976 (comment) is a real issue? I don't think this is a critical path for performance (how much parallelism do you expect for function lookups in a Spark session?), and synchronizing this seems simpler and good enough.

riyaverm-db pushed a commit to riyaverm-db/spark that referenced this pull request Jun 7, 2024
### What changes were proposed in this pull request?
This PR propose to improve concurrency performance for `FunctionRegistry`.

### Why are the changes needed?
Currently, `SimpleFunctionRegistryBase` adopted the `mutable.Map` caching function infos. The `SimpleFunctionRegistryBase`  guarded by this so as ensure security under multithreading.
Because all the mutable state are related to `functionBuilders`, we can delegate security to `ConcurrentHashMap`.
`ConcurrentHashMap ` has higher concurrency activity and responsiveness.
After this change, `FunctionRegistry` have better perf than before.

### Does this PR introduce _any_ user-facing change?
'No'.

### How was this patch tested?
GA.
The benchmark test.
```
object FunctionRegistryBenchmark extends BenchmarkBase {

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("FunctionRegistry") {
      val iters = 1000000
      val threadNum = 4
      val functionRegistry = FunctionRegistry.builtin
      val names = FunctionRegistry.expressions.keys.toSeq
      val barrier = new CyclicBarrier(threadNum + 1)
      val threadPool = ThreadUtils.newDaemonFixedThreadPool(threadNum, "test-function-registry")
      val benchmark = new Benchmark("SimpleFunctionRegistry", iters, output = output)

      benchmark.addCase("only read") { _ =>
        for (_ <- 1 to threadNum) {
          threadPool.execute(new Runnable {
            val random = new Random()
            override def run(): Unit = {
              barrier.await()
              for (_ <- 1 to iters) {
                val name = names(random.nextInt(names.size))
                val fun = functionRegistry.lookupFunction(new FunctionIdentifier(name))
                assert(fun.map(_.getName).get == name)
                functionRegistry.listFunction()
              }
              barrier.await()
            }
          })
        }
        barrier.await()
        barrier.await()
      }

      benchmark.run()
    }
  }
}
```
The benchmark output before this PR.
```
Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU  1.80GHz
SimpleFunctionRegistry:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
only read                                         54858          55043         261          0.0       54858.1       1.0X
```
The benchmark output after this PR.
```
Java HotSpot(TM) 64-Bit Server VM 17.0.9+11-LTS-201 on Mac OS X 10.14.6
Intel(R) Core(TM) i5-5350U CPU  1.80GHz
SimpleFunctionRegistry:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
only read                                         20202          20264          88          0.0       20202.1       1.0X
```

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes apache#44976 from beliefer/SPARK-46937.

Authored-by: beliefer <[email protected]>
Signed-off-by: beliefer <[email protected]>
@yaooqinn
Copy link
Member

yaooqinn commented Jun 7, 2024

+1 for #44976 (comment)

@cloud-fan
Copy link
Contributor

I've sent out the revert PR: #46940

cloud-fan added a commit that referenced this pull request Jun 11, 2024
…ctionRegistry"

### What changes were proposed in this pull request?

Reverts #44976 as it breaks thread-safety

### Why are the changes needed?

Fix thread-safety

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #46940 from cloud-fan/revert.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants