Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into remove_analyzed_f…
Browse files Browse the repository at this point in the history
…rom_create_temp_view
  • Loading branch information
imback82 committed Mar 25, 2021
2 parents a14b3b9 + 7838f55 commit 507b00c
Show file tree
Hide file tree
Showing 65 changed files with 3,705 additions and 937 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,18 @@ private[spark] class ContextCleaner(
registerForCleanup(rdd, CleanCheckpoint(parentId))
}

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

/** Register a SparkListener to be cleaned up when its owner is garbage collected. */
def registerSparkListenerForCleanup(
listenerOwner: AnyRef,
listener: SparkListener): Unit = {
registerForCleanup(listenerOwner, CleanSparkListener(listener))
}

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}

/** Keep cleaning RDD, shuffle, and broadcast state. */
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ private[deploy] class HadoopFSDelegationTokenProvider
creds: Credentials): Option[Long] = {
try {
val fileSystems = HadoopFSDelegationTokenProvider.hadoopFSsToAccess(sparkConf, hadoopConf)
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)
// The hosts on which the file systems to be excluded from token renewal
val fsToExclude = sparkConf.get(YARN_KERBEROS_FILESYSTEM_RENEWAL_EXCLUDE)
.map(new Path(_).getFileSystem(hadoopConf).getUri.getHost)
.toSet
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds,
fsToExclude)

// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
Expand Down Expand Up @@ -99,11 +104,18 @@ private[deploy] class HadoopFSDelegationTokenProvider
private def fetchDelegationTokens(
renewer: String,
filesystems: Set[FileSystem],
creds: Credentials): Credentials = {
creds: Credentials,
fsToExclude: Set[String]): Credentials = {

filesystems.foreach { fs =>
logInfo(s"getting token for: $fs with renewer $renewer")
fs.addDelegationTokens(renewer, creds)
if (fsToExclude.contains(fs.getUri.getHost)) {
// YARN RM skips renewing token with empty renewer
logInfo(s"getting token for: $fs with empty renewer to skip renewal")
fs.addDelegationTokens("", creds)
} else {
logInfo(s"getting token for: $fs with renewer $renewer")
fs.addDelegationTokens(renewer, creds)
}
}

creds
Expand All @@ -119,7 +131,7 @@ private[deploy] class HadoopFSDelegationTokenProvider
val renewer = UserGroupInformation.getCurrentUser().getUserName()

val creds = new Credentials()
fetchDelegationTokens(renewer, filesystems, creds)
fetchDelegationTokens(renewer, filesystems, creds, Set.empty)

val renewIntervals = creds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,18 @@ package object config {
.toSequence
.createWithDefault(Nil)

private[spark] val YARN_KERBEROS_FILESYSTEM_RENEWAL_EXCLUDE =
ConfigBuilder("spark.yarn.kerberos.renewal.excludeHadoopFileSystems")
.doc("The list of Hadoop filesystem URLs whose hosts will be excluded from " +
"delegation token renewal at resource scheduler. Currently this is known to " +
"work under YARN, so YARN Resource Manager won't renew tokens for the application. " +
"Note that as resource scheduler does not renew token, so any application running " +
"longer than the original token expiration that tries to use that token will likely fail.")
.version("3.2.0")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.version("1.0.0")
.intConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericContainer, GenericData}
import org.apache.avro.io._
import org.apache.commons.io.IOUtils

Expand All @@ -35,17 +35,18 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.Utils

/**
* Custom serializer used for generic Avro records. If the user registers the schemas
* Custom serializer used for generic Avro containers. If the user registers the schemas
* ahead of time, then the schema's fingerprint will be sent with each message instead of the actual
* schema, as to reduce network IO.
* Actions like parsing or compressing schemas are computationally expensive so the serializer
* caches all previously seen values as to reduce the amount of work needed to do.
* @param schemas a map where the keys are unique IDs for Avro schemas and the values are the
* string representation of the Avro schema, used to decrease the amount of data
* that needs to be serialized.
* @tparam D the subtype of [[GenericContainer]] handled by this serializer
*/
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
extends KSerializer[GenericRecord] {
private[serializer] class GenericAvroSerializer[D <: GenericContainer]
(schemas: Map[Long, String]) extends KSerializer[D] {

/** Used to reduce the amount of effort to compress the schema */
private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
Expand Down Expand Up @@ -100,10 +101,10 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
})

/**
* Serializes a record to the given output stream. It caches a lot of the internal data as
* to not redo work
* Serializes a generic container to the given output stream. It caches a lot of the internal
* data as to not redo work
*/
def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = {
def serializeDatum(datum: D, output: KryoOutput): Unit = {
val encoder = EncoderFactory.get.binaryEncoder(output, null)
val schema = datum.getSchema
val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
Expand All @@ -121,16 +122,16 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
}

writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
.asInstanceOf[DatumWriter[R]]
.asInstanceOf[DatumWriter[D]]
.write(datum, encoder)
encoder.flush()
}

/**
* Deserializes generic records into their in-memory form. There is internal
* Deserializes generic containers into their in-memory form. There is internal
* state to keep a cache of already seen schemas and datum readers.
*/
def deserializeDatum(input: KryoInput): GenericRecord = {
def deserializeDatum(input: KryoInput): D = {
val schema = {
if (input.readBoolean()) {
val fingerprint = input.readLong()
Expand All @@ -151,13 +152,13 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
}
val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
.asInstanceOf[DatumReader[GenericRecord]]
.read(null, decoder)
.asInstanceOf[DatumReader[D]]
.read(null.asInstanceOf[D], decoder)
}

override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
override def write(kryo: Kryo, output: KryoOutput, datum: D): Unit =
serializeDatum(datum, output)

override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord =
override def read(kryo: Kryo, input: KryoInput, datumClass: Class[D]): D =
deserializeDatum(input)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutpu
import com.esotericsoftware.kryo.pool.{KryoCallback, KryoFactory, KryoPool}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.{GenericContainer, GenericData, GenericRecord}
import org.roaringbitmap.RoaringBitmap

import org.apache.spark._
Expand Down Expand Up @@ -153,8 +153,18 @@ class KryoSerializer(conf: SparkConf)
kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
// Register serializers for Avro GenericContainer classes
// We do not handle SpecificRecordBase and SpecificFixed here. They are abstract classes and
// we will need to register serializers for their concrete implementations individually.
// Also, their serialization requires the use of SpecificDatum(Reader|Writer) instead of
// GenericDatum(Reader|Writer).
def registerAvro[T <: GenericContainer]()(implicit ct: ClassTag[T]): Unit =
kryo.register(ct.runtimeClass, new GenericAvroSerializer[T](avroSchemas))
registerAvro[GenericRecord]
registerAvro[GenericData.Record]
registerAvro[GenericData.Array[_]]
registerAvro[GenericData.EnumSymbol]
registerAvro[GenericData.Fixed]

// Use the default classloader when calling the user registrator.
Utils.withContextClassLoader(classLoader) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,17 @@ private[spark] class BlockStoreShuffleReader[K, C](
true
}
val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
// SPARK-34790: Fetching continuous blocks in batch is incompatible with io encryption.
val ioEncryption = conf.get(config.IO_ENCRYPTION_ENABLED)

val doBatchFetch = shouldBatchFetch && serializerRelocatable &&
(!compressed || codecConcatenation) && !useOldFetchProtocol
(!compressed || codecConcatenation) && !useOldFetchProtocol && !ioEncryption
if (shouldBatchFetch && !doBatchFetch) {
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " +
"we can not enable the feature because other conditions are not satisfied. " +
s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " +
s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " +
s"$useOldFetchProtocol.")
s"$useOldFetchProtocol, io encryption: $ioEncryption.")
}
doBatchFetch
}
Expand Down
Loading

0 comments on commit 507b00c

Please sign in to comment.