From bf2b6813e4ac28bb437bc2fa91dfa82e954881ae Mon Sep 17 00:00:00 2001 From: wayneguow Date: Tue, 31 Jan 2023 11:08:02 +0800 Subject: [PATCH 01/11] rebase master --- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 2 +- .../org/apache/spark/internal/config/package.scala | 10 ++++++++-- .../sort/io/LocalDiskShuffleMapOutputWriterSuite.scala | 2 +- docs/core-migration-guide.md | 3 +++ 4 files changed, 13 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java index 606bb625f5b22..c0b9018c770a0 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java @@ -74,7 +74,7 @@ public LocalDiskShuffleMapOutputWriter( this.blockResolver = blockResolver; this.bufferSize = (int) (long) sparkConf.get( - package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024; + package$.MODULE$.SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE()) * 1024; this.partitionLengths = new long[numPartitions]; this.outputFile = blockResolver.getDataFile(shuffleId, mapId); this.outputTempFile = null; diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index dc3edfaa86133..e9e2a80a96501 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1463,8 +1463,7 @@ package object config { private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.shuffle.unsafe.file.output.buffer") - .doc("The file system for this buffer size after each partition " + - "is written in unsafe shuffle writer. In KiB unless otherwise specified.") + .doc("(Deprecated since Spark 3.4, please use 'spark.shuffle.localDisk.file.output.buffer'.)") .version("2.3.0") .bytesConf(ByteUnit.KiB) .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, @@ -1472,6 +1471,13 @@ package object config { s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.") .createWithDefaultString("32k") + private[spark] val SHUFFLE_LOCAL_DISK_FILE_OUTPUT_BUFFER_SIZE = + ConfigBuilder("spark.shuffle.localDisk.file.output.buffer") + .doc("The file system for this buffer size after each partition " + + "is written in all local disk shuffle writers. In KiB unless otherwise specified.") + .version("3.4.0") + .fallbackConf(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE) + private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE = ConfigBuilder("spark.shuffle.spill.diskWriteBufferSize") .doc("The buffer size, in bytes, to use when writing the sorted records to an on-disk file.") diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala index 3db7527262568..7ab2cb864234f 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriterSuite.scala @@ -71,7 +71,7 @@ class LocalDiskShuffleMapOutputWriterSuite extends SparkFunSuite { partitionSizesInMergedFile = null conf = new SparkConf() .set("spark.app.id", "example.spark.app") - .set("spark.shuffle.unsafe.file.output.buffer", "16k") + .set("spark.shuffle.localDisk.file.output.buffer", "16k") when(blockResolver.getDataFile(anyInt, anyLong)).thenReturn(mergedOutputFile) when(blockResolver.createTempFile(any(classOf[File]))) .thenAnswer { invocationOnMock => diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 28a9dd0f43715..590defb24be15 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -66,6 +66,9 @@ license: | - Since Spark 3.4, Spark will use RocksDB store if `spark.history.store.hybridStore.enabled` is true. To restore the behavior before Spark 3.4, you can set `spark.history.store.hybridStore.diskBackend` to `LEVELDB`. +- Since Spark 3.4, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead. + + ## Upgrading from Core 3.2 to 3.3 - Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x has reached end of life and is no longer supported by the community. Vulnerabilities reported after August 2015 against log4j 1.x were not checked and will not be fixed. Users should rewrite original log4j properties files using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the `conf/log4j.properties.template` which is included in Spark distribution, to `conf/log4j2.properties.template` with log4j2 properties format. From a77cd1cabf3e263712507daed956b2ab176c12a5 Mon Sep 17 00:00:00 2001 From: wayneguow Date: Tue, 31 Jan 2023 11:09:31 +0800 Subject: [PATCH 02/11] remove blank --- docs/core-migration-guide.md | 1 - 1 file changed, 1 deletion(-) diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 590defb24be15..4a495245db677 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -68,7 +68,6 @@ license: | - Since Spark 3.4, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead. - ## Upgrading from Core 3.2 to 3.3 - Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x has reached end of life and is no longer supported by the community. Vulnerabilities reported after August 2015 against log4j 1.x were not checked and will not be fixed. Users should rewrite original log4j properties files using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the `conf/log4j.properties.template` which is included in Spark distribution, to `conf/log4j2.properties.template` with log4j2 properties format. From 5a8246ce6c2f70d31f09a19be743c74ca0d918c1 Mon Sep 17 00:00:00 2001 From: wayneguow Date: Tue, 31 Jan 2023 13:23:14 +0800 Subject: [PATCH 03/11] update docs --- docs/configuration.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index aaaaca05341d1..9a54e14f11300 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1031,11 +1031,19 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.unsafe.file.output.buffer 32k - The file system for this buffer size after each partition is written in unsafe shuffle writer. - In KiB unless otherwise specified. + Deprecated since Spark 3.5, please use spark.shuffle.localDisk.file.output.buffer. 2.3.0 + + spark.shuffle.localDisk.file.output.buffer + 32k + + The file system for this buffer size after each partition is written in in all local disk shuffle writers. + In KiB unless otherwise specified. + + 3.5.0 + spark.shuffle.spill.diskWriteBufferSize 1024 * 1024 From b89264fe4b557386f9fca26721abf28b06645c7f Mon Sep 17 00:00:00 2001 From: wayneguow Date: Sun, 5 Feb 2023 22:24:10 +0800 Subject: [PATCH 04/11] change target to Spark 3.5 --- .../main/scala/org/apache/spark/internal/config/package.scala | 4 ++-- docs/core-migration-guide.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e9e2a80a96501..584d6b88e7372 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1463,7 +1463,7 @@ package object config { private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.shuffle.unsafe.file.output.buffer") - .doc("(Deprecated since Spark 3.4, please use 'spark.shuffle.localDisk.file.output.buffer'.)") + .doc("(Deprecated since Spark 3.5, please use 'spark.shuffle.localDisk.file.output.buffer'.)") .version("2.3.0") .bytesConf(ByteUnit.KiB) .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, @@ -1475,7 +1475,7 @@ package object config { ConfigBuilder("spark.shuffle.localDisk.file.output.buffer") .doc("The file system for this buffer size after each partition " + "is written in all local disk shuffle writers. In KiB unless otherwise specified.") - .version("3.4.0") + .version("3.5.0") .fallbackConf(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE) private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE = diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 4a495245db677..f6b6d6d463db4 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -50,6 +50,8 @@ license: | - Since Spark 4.0, Spark performs speculative executions less agressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`. +- Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead. + ## Upgrading from Core 3.4 to 3.5 - Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is deprecated. Use `spark.executor.failuresValidityInterval` instead. @@ -66,8 +68,6 @@ license: | - Since Spark 3.4, Spark will use RocksDB store if `spark.history.store.hybridStore.enabled` is true. To restore the behavior before Spark 3.4, you can set `spark.history.store.hybridStore.diskBackend` to `LEVELDB`. -- Since Spark 3.4, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead. - ## Upgrading from Core 3.2 to 3.3 - Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x has reached end of life and is no longer supported by the community. Vulnerabilities reported after August 2015 against log4j 1.x were not checked and will not be fixed. Users should rewrite original log4j properties files using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the `conf/log4j.properties.template` which is included in Spark distribution, to `conf/log4j2.properties.template` with log4j2 properties format. From 74bb68a9cf418d2e68072012b689d2894362557d Mon Sep 17 00:00:00 2001 From: wayneguow Date: Tue, 14 Feb 2023 16:09:13 +0800 Subject: [PATCH 05/11] add to deprecatedConfigs --- core/src/main/scala/org/apache/spark/SparkConf.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 95955455a9d4b..6806f83d3e1b2 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -647,7 +647,9 @@ private[spark] object SparkConf extends Logging { DeprecatedConfig("spark.yarn.blacklist.executor.launch.blacklisting.enabled", "3.1.0", "Please use spark.yarn.executor.launch.excludeOnFailure.enabled"), DeprecatedConfig("spark.network.remoteReadNioBufferConversion", "3.5.2", - "Please open a JIRA ticket to report it if you need to use this configuration.") + "Please open a JIRA ticket to report it if you need to use this configuration."), + DeprecatedConfig("spark.shuffle.unsafe.file.output.buffer", "3.5.2", + "Please use spark.shuffle.localDisk.file.output.buffer") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) From 3236f5f41d0dc74ae6f08a77ce6ebd7c4638b36f Mon Sep 17 00:00:00 2001 From: wayneguow Date: Sat, 6 May 2023 16:16:15 +0800 Subject: [PATCH 06/11] update docs --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 9a54e14f11300..a4613227c9046 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1039,7 +1039,7 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.localDisk.file.output.buffer 32k - The file system for this buffer size after each partition is written in in all local disk shuffle writers. + The file system for this buffer size after each partition is written in all local disk shuffle writers. In KiB unless otherwise specified. 3.5.0 From b90ed1f46dedc91c47476dd944c537ec224ac567 Mon Sep 17 00:00:00 2001 From: wayneguow Date: Sat, 13 May 2023 05:18:59 +0800 Subject: [PATCH 07/11] remove --- docs/core-migration-guide.md | 116 ----------------------------------- 1 file changed, 116 deletions(-) delete mode 100644 docs/core-migration-guide.md diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md deleted file mode 100644 index f6b6d6d463db4..0000000000000 --- a/docs/core-migration-guide.md +++ /dev/null @@ -1,116 +0,0 @@ ---- -layout: global -title: "Migration Guide: Spark Core" -displayTitle: "Migration Guide: Spark Core" -license: | - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---- - -* Table of contents -{:toc} - -## Upgrading from Core 3.5 to 4.0 - -- Since Spark 4.0, Spark migrated all its internal reference of servlet API from `javax` to `jakarta` - -- Since Spark 4.0, Spark will roll event logs to archive them incrementally. To restore the behavior before Spark 4.0, you can set `spark.eventLog.rolling.enabled` to `false`. - -- Since Spark 4.0, Spark will compress event logs. To restore the behavior before Spark 4.0, you can set `spark.eventLog.compress` to `false`. - -- Since Spark 4.0, Spark workers will clean up worker and stopped application directories periodically. To restore the behavior before Spark 4.0, you can set `spark.worker.cleanup.enabled` to `false`. - -- Since Spark 4.0, `spark.shuffle.service.db.backend` is set to `ROCKSDB` by default which means Spark will use RocksDB store for shuffle service. To restore the behavior before Spark 4.0, you can set `spark.shuffle.service.db.backend` to `LEVELDB`. - -- In Spark 4.0, support for Apache Mesos as a resource manager was removed. - -- Since Spark 4.0, Spark uses `ReadWriteOncePod` instead of `ReadWriteOnce` access mode in persistence volume claims. To restore the legacy behavior, you can set `spark.kubernetes.legacy.useReadWriteOnceAccessMode` to `true`. - -- Since Spark 4.0, Spark uses `~/.ivy2.5.2` as Ivy user directory by default to isolate the existing systems from Apache Ivy's incompatibility. To restore the legacy behavior, you can set `spark.jars.ivy` to `~/.ivy2`. - -- Since Spark 4.0, Spark uses the external shuffle service for deleting shuffle blocks for deallocated executors when the shuffle is no longer needed. To restore the legacy behavior, you can set `spark.shuffle.service.removeShuffle` to `false`. - -- Starting with Spark 4.0, the default logging format for `spark-submit` has changed from plain text to JSON lines to improve log analysis. If you prefer plain text logs, you have two options: - - Set the Spark configuration `spark.log.structuredLogging.enabled` to `false`. - - Use a custom log4j configuration file, such as renaming the template file `conf/log4j2.properties.pattern-layout-template` to `conf/log4j2.properties`. - -- Since Spark 4.0, the MDC (Mapped Diagnostic Context) key for Spark task names in Spark logs has been changed from `mdc.taskName` to `task_name`. To use the key `mdc.taskName`, you can set `spark.log.legacyTaskNameMdc.enabled` to `true`. - -- Since Spark 4.0, Spark performs speculative executions less agressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`. - -- Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead. - -## Upgrading from Core 3.4 to 3.5 - -- Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is deprecated. Use `spark.executor.failuresValidityInterval` instead. - -- Since Spark 3.5, `spark.yarn.max.executor.failures` is deprecated. Use `spark.executor.maxNumFailures` instead. - -## Upgrading from Core 3.3 to 3.4 - -- Since Spark 3.4, Spark driver will own `PersistentVolumnClaim`s and try to reuse if they are not assigned to live executors. To restore the behavior before Spark 3.4, you can set `spark.kubernetes.driver.ownPersistentVolumeClaim` to `false` and `spark.kubernetes.driver.reusePersistentVolumeClaim` to `false`. - -- Since Spark 3.4, Spark driver will track shuffle data when dynamic allocation is enabled without shuffle service. To restore the behavior before Spark 3.4, you can set `spark.dynamicAllocation.shuffleTracking.enabled` to `false`. - -- Since Spark 3.4, Spark will try to decommission cached RDD and shuffle blocks if both `spark.decommission.enabled` and `spark.storage.decommission.enabled` are true. To restore the behavior before Spark 3.4, you can set both `spark.storage.decommission.rddBlocks.enabled` and `spark.storage.decommission.shuffleBlocks.enabled` to `false`. - -- Since Spark 3.4, Spark will use RocksDB store if `spark.history.store.hybridStore.enabled` is true. To restore the behavior before Spark 3.4, you can set `spark.history.store.hybridStore.diskBackend` to `LEVELDB`. - -## Upgrading from Core 3.2 to 3.3 - -- Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x has reached end of life and is no longer supported by the community. Vulnerabilities reported after August 2015 against log4j 1.x were not checked and will not be fixed. Users should rewrite original log4j properties files using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the `conf/log4j.properties.template` which is included in Spark distribution, to `conf/log4j2.properties.template` with log4j2 properties format. - -## Upgrading from Core 3.1 to 3.2 - -- Since Spark 3.2, `spark.scheduler.allocation.file` supports read remote file using hadoop filesystem which means if the path has no scheme Spark will respect hadoop configuration to read it. To restore the behavior before Spark 3.2, you can specify the local scheme for `spark.scheduler.allocation.file` e.g. `file:///path/to/file`. - -- Since Spark 3.2, `spark.hadoopRDD.ignoreEmptySplits` is set to `true` by default which means Spark will not create empty partitions for empty input splits. To restore the behavior before Spark 3.2, you can set `spark.hadoopRDD.ignoreEmptySplits` to `false`. - -- Since Spark 3.2, `spark.eventLog.compression.codec` is set to `zstd` by default which means Spark will not fallback to use `spark.io.compression.codec` anymore. - -- Since Spark 3.2, `spark.storage.replication.proactive` is enabled by default which means Spark tries to replenish in case of the loss of cached RDD block replicas due to executor failures. To restore the behavior before Spark 3.2, you can set `spark.storage.replication.proactive` to `false`. - -- In Spark 3.2, `spark.launcher.childConectionTimeout` is deprecated (typo) though still works. Use `spark.launcher.childConnectionTimeout` instead. - -- In Spark 3.2, support for Apache Mesos as a resource manager is deprecated and will be removed in a future version. - -- In Spark 3.2, Spark will delete K8s driver service resource when the application terminates by itself. To restore the behavior before Spark 3.2, you can set `spark.kubernetes.driver.service.deleteOnTermination` to `false`. - -## Upgrading from Core 3.0 to 3.1 - -- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.executor.allowSparkContext` when creating `SparkContext` in executors. - -- In Spark 3.0 and below, Spark propagated the Hadoop classpath from `yarn.application.classpath` and `mapreduce.application.classpath` into the Spark application submitted to YARN when Spark distribution is with the built-in Hadoop. Since Spark 3.1, it does not propagate anymore when the Spark distribution is with the built-in Hadoop in order to prevent the failure from the different transitive dependencies picked up from the Hadoop cluster such as Guava and Jackson. To restore the behavior before Spark 3.1, you can set `spark.yarn.populateHadoopClasspath` to `true`. - -## Upgrading from Core 2.4 to 3.0 - -- The `org.apache.spark.ExecutorPlugin` interface and related configuration has been replaced with - `org.apache.spark.api.plugin.SparkPlugin`, which adds new functionality. Plugins using the old - interface must be modified to extend the new interfaces. Check the - [Monitoring](monitoring.html) guide for more details. - -- Deprecated method `TaskContext.isRunningLocally` has been removed. Local execution was removed and it always has returned `false`. - -- Deprecated method `shuffleBytesWritten`, `shuffleWriteTime` and `shuffleRecordsWritten` in `ShuffleWriteMetrics` have been removed. Instead, use `bytesWritten`, `writeTime ` and `recordsWritten` respectively. - -- Deprecated method `AccumulableInfo.apply` have been removed because creating `AccumulableInfo` is disallowed. - -- Deprecated accumulator v1 APIs have been removed and please use v2 APIs instead. - -- Event log file will be written as UTF-8 encoding, and Spark History Server will replay event log files as UTF-8 encoding. Previously Spark wrote the event log file as default charset of driver JVM process, so Spark History Server of Spark 2.x is needed to read the old event log files in case of incompatible encoding. - -- A new protocol for fetching shuffle blocks is used. It's recommended that external shuffle services be upgraded when running Spark 3.0 apps. You can still use old external shuffle services by setting the configuration `spark.shuffle.useOldFetchProtocol` to `true`. Otherwise, Spark may run into errors with messages like `IllegalArgumentException: Unexpected message type: `. - -- `SPARK_WORKER_INSTANCES` is deprecated in Standalone mode. It's recommended to launch multiple executors in one worker and launch one worker per node instead of launching multiple workers per node and launching one executor per worker. From 46d88f4de6608474d1dd413abb30aa18ce19a80d Mon Sep 17 00:00:00 2001 From: wayneguow Date: Thu, 30 May 2024 19:07:00 +0800 Subject: [PATCH 08/11] fix docs --- docs/core-migration-guide.md | 116 +++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 docs/core-migration-guide.md diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md new file mode 100644 index 0000000000000..26b0ff32cf5d9 --- /dev/null +++ b/docs/core-migration-guide.md @@ -0,0 +1,116 @@ +--- +layout: global +title: "Migration Guide: Spark Core" +displayTitle: "Migration Guide: Spark Core" +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +* Table of contents +{:toc} + +## Upgrading from Core 3.5 to 4.0 + +- Since Spark 4.0, Spark migrated all its internal reference of servlet API from `javax` to `jakarta` + +- Since Spark 4.0, Spark will roll event logs to archive them incrementally. To restore the behavior before Spark 4.0, you can set `spark.eventLog.rolling.enabled` to `false`. + +- Since Spark 4.0, Spark will compress event logs. To restore the behavior before Spark 4.0, you can set `spark.eventLog.compress` to `false`. + +- Since Spark 4.0, Spark workers will clean up worker and stopped application directories periodically. To restore the behavior before Spark 4.0, you can set `spark.worker.cleanup.enabled` to `false`. + +- Since Spark 4.0, `spark.shuffle.service.db.backend` is set to `ROCKSDB` by default which means Spark will use RocksDB store for shuffle service. To restore the behavior before Spark 4.0, you can set `spark.shuffle.service.db.backend` to `LEVELDB`. + +- In Spark 4.0, support for Apache Mesos as a resource manager was removed. + +- Since Spark 4.0, Spark uses `ReadWriteOncePod` instead of `ReadWriteOnce` access mode in persistence volume claims. To restore the legacy behavior, you can set `spark.kubernetes.legacy.useReadWriteOnceAccessMode` to `true`. + +- Since Spark 4.0, Spark uses `~/.ivy2.5.2` as Ivy user directory by default to isolate the existing systems from Apache Ivy's incompatibility. To restore the legacy behavior, you can set `spark.jars.ivy` to `~/.ivy2`. + +- Since Spark 4.0, Spark uses the external shuffle service for deleting shuffle blocks for deallocated executors when the shuffle is no longer needed. To restore the legacy behavior, you can set `spark.shuffle.service.removeShuffle` to `false`. + +- Starting with Spark 4.0, the default logging format for `spark-submit` has changed from plain text to JSON lines to improve log analysis. If you prefer plain text logs, you have two options: + - Set the Spark configuration `spark.log.structuredLogging.enabled` to `false`. + - Use a custom log4j configuration file, such as renaming the template file `conf/log4j2.properties.pattern-layout-template` to `conf/log4j2.properties`. + +- Since Spark 4.0, the MDC (Mapped Diagnostic Context) key for Spark task names in Spark logs has been changed from `mdc.taskName` to `task_name`. To use the key `mdc.taskName`, you can set `spark.log.legacyTaskNameMdc.enabled` to `true`. + +- Since Spark 4.0, Spark performs speculative executions less aggressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`. + +- Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead. + +## Upgrading from Core 3.4 to 3.5 + +- Since Spark 3.5, `spark.yarn.executor.failuresValidityInterval` is deprecated. Use `spark.executor.failuresValidityInterval` instead. + +- Since Spark 3.5, `spark.yarn.max.executor.failures` is deprecated. Use `spark.executor.maxNumFailures` instead. + +## Upgrading from Core 3.3 to 3.4 + +- Since Spark 3.4, Spark driver will own `PersistentVolumnClaim`s and try to reuse if they are not assigned to live executors. To restore the behavior before Spark 3.4, you can set `spark.kubernetes.driver.ownPersistentVolumeClaim` to `false` and `spark.kubernetes.driver.reusePersistentVolumeClaim` to `false`. + +- Since Spark 3.4, Spark driver will track shuffle data when dynamic allocation is enabled without shuffle service. To restore the behavior before Spark 3.4, you can set `spark.dynamicAllocation.shuffleTracking.enabled` to `false`. + +- Since Spark 3.4, Spark will try to decommission cached RDD and shuffle blocks if both `spark.decommission.enabled` and `spark.storage.decommission.enabled` are true. To restore the behavior before Spark 3.4, you can set both `spark.storage.decommission.rddBlocks.enabled` and `spark.storage.decommission.shuffleBlocks.enabled` to `false`. + +- Since Spark 3.4, Spark will use RocksDB store if `spark.history.store.hybridStore.enabled` is true. To restore the behavior before Spark 3.4, you can set `spark.history.store.hybridStore.diskBackend` to `LEVELDB`. + +## Upgrading from Core 3.2 to 3.3 + +- Since Spark 3.3, Spark migrates its log4j dependency from 1.x to 2.x because log4j 1.x has reached end of life and is no longer supported by the community. Vulnerabilities reported after August 2015 against log4j 1.x were not checked and will not be fixed. Users should rewrite original log4j properties files using log4j2 syntax (XML, JSON, YAML, or properties format). Spark rewrites the `conf/log4j.properties.template` which is included in Spark distribution, to `conf/log4j2.properties.template` with log4j2 properties format. + +## Upgrading from Core 3.1 to 3.2 + +- Since Spark 3.2, `spark.scheduler.allocation.file` supports read remote file using hadoop filesystem which means if the path has no scheme Spark will respect hadoop configuration to read it. To restore the behavior before Spark 3.2, you can specify the local scheme for `spark.scheduler.allocation.file` e.g. `file:///path/to/file`. + +- Since Spark 3.2, `spark.hadoopRDD.ignoreEmptySplits` is set to `true` by default which means Spark will not create empty partitions for empty input splits. To restore the behavior before Spark 3.2, you can set `spark.hadoopRDD.ignoreEmptySplits` to `false`. + +- Since Spark 3.2, `spark.eventLog.compression.codec` is set to `zstd` by default which means Spark will not fallback to use `spark.io.compression.codec` anymore. + +- Since Spark 3.2, `spark.storage.replication.proactive` is enabled by default which means Spark tries to replenish in case of the loss of cached RDD block replicas due to executor failures. To restore the behavior before Spark 3.2, you can set `spark.storage.replication.proactive` to `false`. + +- In Spark 3.2, `spark.launcher.childConectionTimeout` is deprecated (typo) though still works. Use `spark.launcher.childConnectionTimeout` instead. + +- In Spark 3.2, support for Apache Mesos as a resource manager is deprecated and will be removed in a future version. + +- In Spark 3.2, Spark will delete K8s driver service resource when the application terminates by itself. To restore the behavior before Spark 3.2, you can set `spark.kubernetes.driver.service.deleteOnTermination` to `false`. + +## Upgrading from Core 3.0 to 3.1 + +- In Spark 3.0 and below, `SparkContext` can be created in executors. Since Spark 3.1, an exception will be thrown when creating `SparkContext` in executors. You can allow it by setting the configuration `spark.executor.allowSparkContext` when creating `SparkContext` in executors. + +- In Spark 3.0 and below, Spark propagated the Hadoop classpath from `yarn.application.classpath` and `mapreduce.application.classpath` into the Spark application submitted to YARN when Spark distribution is with the built-in Hadoop. Since Spark 3.1, it does not propagate anymore when the Spark distribution is with the built-in Hadoop in order to prevent the failure from the different transitive dependencies picked up from the Hadoop cluster such as Guava and Jackson. To restore the behavior before Spark 3.1, you can set `spark.yarn.populateHadoopClasspath` to `true`. + +## Upgrading from Core 2.4 to 3.0 + +- The `org.apache.spark.ExecutorPlugin` interface and related configuration has been replaced with + `org.apache.spark.api.plugin.SparkPlugin`, which adds new functionality. Plugins using the old + interface must be modified to extend the new interfaces. Check the + [Monitoring](monitoring.html) guide for more details. + +- Deprecated method `TaskContext.isRunningLocally` has been removed. Local execution was removed and it always has returned `false`. + +- Deprecated method `shuffleBytesWritten`, `shuffleWriteTime` and `shuffleRecordsWritten` in `ShuffleWriteMetrics` have been removed. Instead, use `bytesWritten`, `writeTime ` and `recordsWritten` respectively. + +- Deprecated method `AccumulableInfo.apply` have been removed because creating `AccumulableInfo` is disallowed. + +- Deprecated accumulator v1 APIs have been removed and please use v2 APIs instead. + +- Event log file will be written as UTF-8 encoding, and Spark History Server will replay event log files as UTF-8 encoding. Previously Spark wrote the event log file as default charset of driver JVM process, so Spark History Server of Spark 2.x is needed to read the old event log files in case of incompatible encoding. + +- A new protocol for fetching shuffle blocks is used. It's recommended that external shuffle services be upgraded when running Spark 3.0 apps. You can still use old external shuffle services by setting the configuration `spark.shuffle.useOldFetchProtocol` to `true`. Otherwise, Spark may run into errors with messages like `IllegalArgumentException: Unexpected message type: `. + +- `SPARK_WORKER_INSTANCES` is deprecated in Standalone mode. It's recommended to launch multiple executors in one worker and launch one worker per node instead of launching multiple workers per node and launching one executor per worker. From 753854b99497788088eda37e4abd25522389845a Mon Sep 17 00:00:00 2001 From: wayneguow Date: Thu, 30 May 2024 19:32:41 +0800 Subject: [PATCH 09/11] fix docs --- docs/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index a4613227c9046..d62db3d335644 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1031,7 +1031,7 @@ Apart from these, the following properties are also available, and may be useful spark.shuffle.unsafe.file.output.buffer 32k - Deprecated since Spark 3.5, please use spark.shuffle.localDisk.file.output.buffer. + Deprecated since Spark 4.0, please use spark.shuffle.localDisk.file.output.buffer. 2.3.0 @@ -1042,7 +1042,7 @@ Apart from these, the following properties are also available, and may be useful The file system for this buffer size after each partition is written in all local disk shuffle writers. In KiB unless otherwise specified. - 3.5.0 + 4.0.0 spark.shuffle.spill.diskWriteBufferSize From 02843cf3558558127824dc390dc7f13056070502 Mon Sep 17 00:00:00 2001 From: wayneguow Date: Thu, 30 May 2024 19:39:18 +0800 Subject: [PATCH 10/11] update version --- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +- .../main/scala/org/apache/spark/internal/config/package.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 6806f83d3e1b2..cfb514913694b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -648,7 +648,7 @@ private[spark] object SparkConf extends Logging { "Please use spark.yarn.executor.launch.excludeOnFailure.enabled"), DeprecatedConfig("spark.network.remoteReadNioBufferConversion", "3.5.2", "Please open a JIRA ticket to report it if you need to use this configuration."), - DeprecatedConfig("spark.shuffle.unsafe.file.output.buffer", "3.5.2", + DeprecatedConfig("spark.shuffle.unsafe.file.output.buffer", "4.0.0", "Please use spark.shuffle.localDisk.file.output.buffer") ) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 584d6b88e7372..6c78bd55db0b0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1463,7 +1463,7 @@ package object config { private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE = ConfigBuilder("spark.shuffle.unsafe.file.output.buffer") - .doc("(Deprecated since Spark 3.5, please use 'spark.shuffle.localDisk.file.output.buffer'.)") + .doc("(Deprecated since Spark 4.0, please use 'spark.shuffle.localDisk.file.output.buffer'.)") .version("2.3.0") .bytesConf(ByteUnit.KiB) .checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024, @@ -1475,7 +1475,7 @@ package object config { ConfigBuilder("spark.shuffle.localDisk.file.output.buffer") .doc("The file system for this buffer size after each partition " + "is written in all local disk shuffle writers. In KiB unless otherwise specified.") - .version("3.5.0") + .version("4.0.0") .fallbackConf(SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE) private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE = From 28f843d84bc55825bd47841be93c29d9ff7b2a4c Mon Sep 17 00:00:00 2001 From: wayneguow Date: Tue, 4 Jun 2024 22:00:55 +0800 Subject: [PATCH 11/11] separate typo --- docs/core-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 26b0ff32cf5d9..f6b6d6d463db4 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -48,7 +48,7 @@ license: | - Since Spark 4.0, the MDC (Mapped Diagnostic Context) key for Spark task names in Spark logs has been changed from `mdc.taskName` to `task_name`. To use the key `mdc.taskName`, you can set `spark.log.legacyTaskNameMdc.enabled` to `true`. -- Since Spark 4.0, Spark performs speculative executions less aggressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`. +- Since Spark 4.0, Spark performs speculative executions less agressively with `spark.speculation.multiplier=3` and `spark.speculation.quantile=0.9`. To restore the legacy behavior, you can set `spark.speculation.multiplier=1.5` and `spark.speculation.quantile=0.75`. - Since Spark 4.0, `spark.shuffle.unsafe.file.output.buffer` is deprecated though still works. Use `spark.shuffle.localDisk.file.output.buffer` instead.