Skip to content

Commit

Permalink
Maintain support for building and deploying with Cloudera Hadoop 2.6.0
Browse files Browse the repository at this point in the history
- Minor code changes
- Rewrite of [SPARK-25694] Implement custom URLStreamHandlerFactory work-around for backword compatibility with Hadoop 2.6
- Add cloudera repositories
  • Loading branch information
gbloisi-openaire committed Dec 4, 2023
1 parent 796d878 commit 713d0ed
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ trait KafkaDelegationTokenTest extends BeforeAndAfterEach {
override def afterEach(): Unit = {
try {
Configuration.setConfiguration(null)
UserGroupInformation.reset()
//UserGroupInformation.reset()
SparkEnv.set(savedSparkEnv)
} finally {
super.afterEach()
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.{RunJar, StringUtils}
import org.apache.hadoop.util.RunJar
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.logging.log4j.{Level, LogManager}
import org.apache.logging.log4j.core.LoggerContext
Expand Down Expand Up @@ -594,7 +594,7 @@ private[spark] object Utils extends Logging {
if (!source.exists()) {
throw new FileNotFoundException(source.getAbsolutePath)
}
val lowerSrc = StringUtils.toLowerCase(source.getName)
val lowerSrc = source.getName.toLowerCase(Locale.ROOT)
if (lowerSrc.endsWith(".jar")) {
RunJar.unJar(source, dest, RunJar.MATCH_ANY)
} else if (lowerSrc.endsWith(".zip")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
if (kdc != null) {
kdc.stop()
}
UserGroupInformation.reset()
//UserGroupInformation.reset()
}
}
}
22 changes: 20 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,21 @@
<enabled>false</enabled>
</snapshots>
</repository>

<!--
This is used to get cloudera hadoop jars
-->
<repository>
<id>cloudera</id>
<name>Cloudera Repository</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
Expand Down Expand Up @@ -3507,10 +3522,13 @@

<profile>
<id>hadoop-2</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<!-- make sure to update IsolatedClientLoader whenever this version is changed -->
<hadoop.version>2.7.4</hadoop.version>
<curator.version>2.7.1</curator.version>
<hadoop.version>2.6.0-cdh5.9.2</hadoop.version>
<curator.version>2.6.0</curator.version>
<commons-io.version>2.4</commons-io.version>
<!--
the declaration site above of these variables explains why we need to re-assign them here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,9 @@ private[spark] class Client(
sparkConf.get(ROLLED_LOG_INCLUDE_PATTERN).foreach { includePattern =>
try {
val logAggregationContext = Records.newRecord(classOf[LogAggregationContext])
logAggregationContext.setRolledLogsIncludePattern(includePattern)
logAggregationContext.setIncludePattern(includePattern)
sparkConf.get(ROLLED_LOG_EXCLUDE_PATTERN).foreach { excludePattern =>
logAggregationContext.setRolledLogsExcludePattern(excludePattern)
logAggregationContext.setExcludePattern(excludePattern)
}
appContext.setLogAggregationContext(logAggregationContext)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.spark.sql.internal

import java.net.URL
import java.net.URLStreamHandler
import java.net.URLStreamHandlerFactory
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.GuardedBy
Expand Down Expand Up @@ -196,7 +198,25 @@ object SharedState extends Logging {
synchronized {
if (!fsUrlStreamHandlerFactoryInitialized) {
try {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf))
URL.setURLStreamHandlerFactory(new URLStreamHandlerFactory() {
private val fsUrlStreamHandlerFactory = new FsUrlStreamHandlerFactory(hadoopConf)

override def createURLStreamHandler(protocol: String): URLStreamHandler = {
val handler = fsUrlStreamHandlerFactory.createURLStreamHandler(protocol)
if (handler == null) {
return null
}

if (protocol != null &&
(protocol.equalsIgnoreCase("http")
|| protocol.equalsIgnoreCase("https"))) {
// return null to use system default URLStreamHandler
null
} else {
handler
}
}
})
fsUrlStreamHandlerFactoryInitialized = true
} catch {
case NonFatal(_) =>
Expand Down

0 comments on commit 713d0ed

Please sign in to comment.