Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Update sbt testing task of Delta Connect Client to automatically use the Spark latest jars #3670

Open
wants to merge 20 commits into
base: master
Choose a base branch
from

Conversation

longvu-db
Copy link
Contributor

@longvu-db longvu-db commented Sep 11, 2024

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

Currently, in order to test Delta Connect Client with Spark Connect, we always fetch the Spark 4.0 First Preview jars, which were created in May 2024.

Since then, lots of things have changed in Spark, so the jars are outdated and the testing no longer works due to binary incompatibility between the latest version of Delta and Spark 4.0 First Preview version.

Therefore, we are updating the sbt testing task of the Delta Connect Client project to automatically fetch and use the latest jars from Spark nightly releases.

How was this patch tested?

Ran build/sbt -DsparkVersion=master clean connectServer/assembly connectClient/test.

Observed that the latest jar releases of Spark are downloaded and the Delta Connect Client test suite's (DeltaTableSuite) ran successfully.

Does this PR introduce any user-facing changes?

No.

@longvu-db longvu-db changed the title [Spark] Update fetching of Spark Connect jars for Delta Connect Client testing [Spark] Change build.sbt to automatically fetch the latest Spark Master jars for Delta Connect Client testing Sep 11, 2024
@longvu-db longvu-db changed the title [Spark] Change build.sbt to automatically fetch the latest Spark Master jars for Delta Connect Client testing [Spark] Update sbt testing task of Delta Connect Client to automatically use the Spark latest jars Sep 11, 2024
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily for now to test connectClient, cause Delta integration with Spark Master is broken atm so we cannot test connectServer and spark

val metadataUrl = new URL(latestSparkComponentJarDir + "maven-metadata.xml")

// Fetch and parse the maven-metadata.xml file.
val metadataXml = XML.load(metadataUrl)
Copy link
Contributor Author

@longvu-db longvu-db Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reasons, the XML that we download is

image

So it's missing the snapshotVersions section in https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-connect_2.13/4.0.0-SNAPSHOT/maven-metadata.xml

I'm not sure why this is the case, I tried using other load APIs in https://javadoc.io/doc/org.scala-lang.modules/scala-xml_2.13/latest/scala/xml/XML$.html, I'm not sure about changing the version of the scala-xml library in https://github.com/delta-io/delta/blob/master/project/plugins.sbt#L46.

To get the latest jar name, we can either

  1. Extract the timestamp and buildNumber, and combine them together
  2. Extract the value, nested in snapshotVersion.

I think way 1 works just as fine as way 2, so I decided to not spend more time trying to fix the metadataURL fetching to fetch the full file.

files
} else {
destDir.get()
}
}.taskValue,
(Test / resourceGenerators) += Def.task {
val src = url("https://repository.apache.org/content/groups/public/org/apache/spark/spark-connect_2.13/4.0.0-preview1/spark-connect_2.13-4.0.0-preview1.jar")
val dest = (Test / resourceManaged).value / "spark-connect.jar"
val sparkConnectComponentName = "spark-connect_2.13"
Copy link
Contributor Author

@longvu-db longvu-db Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We define sparkComponentName on line 409 to have the suffix _2.13, so also changing this to be coherent.

@@ -317,7 +367,6 @@ lazy val connectClient = (project in file("spark-connect/client"))
val destDir = (Test / resourceManaged).value / "spark"
if (!destDir.exists()) {
IO.createDirectory(destDir)
val files = mutable.Buffer.empty[File]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to maintain a list of files, we write to the directory directly.

@longvu-db
Copy link
Contributor Author

Hey @tomvanbussel, could you please take a look at this PR?

@@ -51,7 +51,7 @@ jobs:
- name: Run Spark Master tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_test.yaml
run: |
TEST_PARALLELISM_COUNT=4 SHARD_ID=${{matrix.shard}} build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test
Copy link
Contributor Author

@longvu-db longvu-db Sep 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily for now to test connectClient, cause Delta integration with Spark Master is broken atm so we cannot test connectServer and spark, will put things back the way they were when about to merge

TEST_PARALLELISM_COUNT=4 SHARD_ID=${{matrix.shard}} build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test
TEST_PARALLELISM_COUNT=1 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporarily for now to test connectClient, cause Delta integration with Spark Master is broken atm so we cannot test connectServer and spark, will put things back the way they were when about to merge

Copy link
Collaborator

@tomvanbussel tomvanbussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! It feels a bit wasteful though to download the full release, only to replace many of the JARs with newer versions. Another downside of this approach is that we cannot use the caching provided by SBT. Would it be possible to instead rely on SBT Assembly to package all of our dependencies into one JAR that we can then load in our tests? (This may require creating another project in the build file.)

@longvu-db
Copy link
Contributor Author

longvu-db commented Sep 13, 2024

@tomvanbussel I did think about that approach, but we still need to download the preview1 to get the sbin bash files along with potentially some other non-Spark jar files, I'm not sure how to get it other than from here (https://dist.apache.org/repos/dist/release/spark/spark-4.0.0-preview1/)

@longvu-db
Copy link
Contributor Author

@tomvanbussel Also, like you observed I downloaded the latest jar for each spark component, when I was naming the spark jar to be sparkComponentName-4.0.0-20240913.001640-365.jar, the code didn't work with the following error.

[info] org.apache.spark.SparkException: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Error accessing configuration file
[info] at java.util.ServiceLoader.fail(ServiceLoader.java:586)
[info] at java.util.ServiceLoader$LazyClassPathLookupIterator.parse(ServiceLoader.java:1180)
[info] at java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1213)
[info] at java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1228)
[info] at java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
[info] at java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
[info] at java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
[info] at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
[info] at scala.collection.StrictOptimizedIterableOps.filterImpl(StrictOptimizedIterableOps.scala:225)
[info] at scala.collection.StrictOptimizedIterableOps.filterImpl$(StrictOptimizedIterableOps.scala:222)
[info] at scala.collection.convert.JavaCollectionWrappers$JIterableWrapper.filterImpl(JavaCollectionWrappers.scala:83)
[info] at scala.collection.StrictOptimizedIterableOps.filter(StrictOptimizedIterableOps.scala:218)
[info] at scala.collection.StrictOptimizedIterableOps.filter$(StrictOptimizedIterableOps.scala:218)
[info] at scala.collection.convert.JavaCollectionWrappers$JIterableWrapper.filter(JavaCollectionWrappers.scala:83)
[info] at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
[info] at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:727)
[info] at org.apache.spark.sql.internal.DataFrameWriterImpl.lookupV2Provider(DataFrameWriterImpl.scala:618)
[info] at org.apache.spark.sql.internal.DataFrameWriterImpl.saveAsTable(DataFrameWriterImpl.scala:429)
[info] at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:2823)
[info] at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2413)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:291)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:191)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:163)
[info] at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:336)
[info] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:742)
[info] at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:336)
[info] at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
[info] at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:80)
[info] at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
[info] at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:79)
[info] at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:335)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:163)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:112)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:318)
[info] Cause: org.apache.spark.SparkException: java.nio.file.NoSuchFileException: /Users/long.vu/delta/spark-connect/client/target/scala-2.13/resource_managed/test/spark/spark-4.0.0-preview1-bin-hadoop3/jars/spark-sql_2.13.jar
[info] at sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
[info] at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
[info] at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
[info] at sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
[info] at sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:148)
[info] at java.nio.file.Files.readAttributes(Files.java:1851)
[info] at java.util.zip.ZipFile$Source.get(ZipFile.java:1428)
[info] at java.util.zip.ZipFile$CleanableResource.(ZipFile.java:718)
[info] at java.util.zip.ZipFile.(ZipFile.java:252)
[info] at java.util.zip.ZipFile.(ZipFile.java:181)
[info] at java.util.jar.JarFile.(JarFile.java:346)
[info] at sun.net.www.protocol.jar.URLJarFile.(URLJarFile.java:103)
[info] at sun.net.www.protocol.jar.URLJarFile.getJarFile(URLJarFile.java:72)
[info] at sun.net.www.protocol.jar.JarFileFactory.get(JarFileFactory.java:168)
[info] at sun.net.www.protocol.jar.JarFileFactory.getOrCreate(JarFileFactory.java:91)
[info] at sun.net.www.protocol.jar.JarURLConnection.connect(JarURLConnection.java:132)
[info] at sun.net.www.protocol.jar.JarURLConnection.getInputStream(JarURLConnection.java:175)
[info] at java.util.ServiceLoader$LazyClassPathLookupIterator.parse(ServiceLoader.java:1172)
[info] at java.util.ServiceLoader$LazyClassPathLookupIterator.nextProviderClass(ServiceLoader.java:1213)
[info] at java.util.ServiceLoader$LazyClassPathLookupIterator.hasNextService(ServiceLoader.java:1228)
[info] at java.util.ServiceLoader$LazyClassPathLookupIterator.hasNext(ServiceLoader.java:1273)
[info] at java.util.ServiceLoader$2.hasNext(ServiceLoader.java:1309)
[info] at java.util.ServiceLoader$3.hasNext(ServiceLoader.java:1393)
[info] at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
[info] at scala.collection.StrictOptimizedIterableOps.filterImpl(StrictOptimizedIterableOps.scala:225)
[info] at scala.collection.StrictOptimizedIterableOps.filterImpl$(StrictOptimizedIterableOps.scala:222)
[info] at scala.collection.convert.JavaCollectionWrappers$JIterableWrapper.filterImpl(JavaCollectionWrappers.scala:83)
[info] at scala.collection.StrictOptimizedIterableOps.filter(StrictOptimizedIterableOps.scala:218)
[info] at scala.collection.StrictOptimizedIterableOps.filter$(StrictOptimizedIterableOps.scala:218)
[info] at scala.collection.convert.JavaCollectionWrappers$JIterableWrapper.filter(JavaCollectionWrappers.scala:83)
[info] at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
[info] at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:727)
[info] at org.apache.spark.sql.internal.DataFrameWriterImpl.lookupV2Provider(DataFrameWriterImpl.scala:618)
[info] at org.apache.spark.sql.internal.DataFrameWriterImpl.saveAsTable(DataFrameWriterImpl.scala:429)
[info] at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:2823)
[info] at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2413)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:291)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:191)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:163)
[info] at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:336)
[info] at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:742)
[info] at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:336)
[info] at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
[info] at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:80)
[info] at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
[info] at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:79)
[info] at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:335)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:163)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:112)
[info] at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:318)

What I had to do was download it and then rename it to sparkComponentName.jar instead of sparkComponentName-4.0.0-20240913.001640-365.jar, like spark-catalyst_2.13.jar, spark-connect_2.13.jar,...

Only then did it start to work.

So I'm not sure if making 1 huge Uber jar will be accepted by start-connect-server.sh, which I think use bin/spark-submit.sh internally.

https://github.com/apache/spark/blob/master/sbin/start-connect-server.sh

@tomvanbussel
Copy link
Collaborator

@longvu-db We can copy the few bash files that we need to this repo. No need to download a 500MB tar just for those small scripts.

@longvu-db
Copy link
Contributor Author

longvu-db commented Sep 13, 2024

@tomvanbussel But I'm not sure if making one huge Spark jar file would work based on the 2nd reason, spark submit is asking for each spark component jars

@longvu-db
Copy link
Contributor Author

@longvu-db
Copy link
Contributor Author

@tomvanbussel But I can try to look into your suggestion of copying the minimal number of needed files (because the tar contains unrelated Spark stuffs that we don't need), however I do feel like we need separate multiple Spark component jar files instead of one Uber Jar file to keep SparkSubmit happy

@longvu-db
Copy link
Contributor Author

Hey @xupefei, since Tom is OOO and you have experience in Delta and Spark Connect, would you be so kind to take a look at this PR?

TLDR: We are testing the Delta Connect Client with Spark Connect Server, the last distribution was Spark 4.0 First Preview in May, we are no longer able to use this Spark distribution since it contains outdated Spark Jars that are no longer binary compatible with the current Delta-Spark.

Right now, I made it work by still downloading the Spark 4.0 Preview distribution and then replace all the Spark 4.0 Preview jars with the latest nightly Spark jar releases.

The Spark 4.0 Preview distribution is great to have since it contains the necessary bash scripts in sbin, and bin to start the Spark Connect Server.

The problem that Tom raised is that this Spark 4.0 Preview distribution is 500MB which might be inefficient, so we can get the sbin and bin by copying them to Delta and just download the latest nightly Spark jar releases. I believe that we cannot build an uber Spark Jar instead of having multiple individual Spark component Jars is because Spark Connect Server (sbin/start-connect-server.sh), uses bin/spark-submit under hood, expects to have a list of individual Spark component Jars.

I attempted to not download Spark 4.0 Preview distribution by using GitHub APIs to download the sbin and bin folders and only download the latest nightly Spark jar releases.

Now, the problem is this doesn't work because there are some external libraries jars that the Spark Component jar expects, such as log4j-core.

Based on this PR, we can get the list, or from many Spark components' pom.xml, one example is https://github.com/apache/spark/blob/master/core/pom.xml.

But it looks like they both have unnecessary jars, building additional scraping code will take time, while hard-coding the library name with the version in a list somewhere doesn't sound like a good idea since it can easily get out-of-date with version upgrade. The great thing about Spark 4.0 Preview Distribution is that it already contains the necessary jars.

My question is: How bad it is that we download 500MB every time we want to test connectClient project? What are the alternatives if we want to avoid that? And if it's okay to download, could you please review this PR?

@@ -317,7 +367,6 @@ lazy val connectClient = (project in file("spark-connect/client"))
val destDir = (Test / resourceManaged).value / "spark"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a TODO

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants