From ba6aa6c829bfcca1b4b3d5a33fe3a7460e7db1f0 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 25 Jun 2018 20:15:37 -0500 Subject: [PATCH 1/5] [SPARK-24918][Core] Executor Plugin api This provides a very simple api for users to specify arbitrary code to run within an executor, eg. for debugging or added instrumentation. The intial api is very simple, but creates an abstract base class to allow future additions. --- .../apache/spark/AbstractExecutorPlugin.java | 39 +++++++++++++++++++ .../org/apache/spark/executor/Executor.scala | 6 +++ .../spark/internal/config/package.scala | 11 ++++++ 3 files changed, 56 insertions(+) create mode 100644 core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java diff --git a/core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java b/core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java new file mode 100644 index 0000000000000..af7e05ac40dad --- /dev/null +++ b/core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +import org.apache.spark.annotation.DeveloperApi; + +/** + * A plugin which can be automaticaly instantiated within each Spark executor. Users can specify + * plugins which should be created with the "spark.executor.plugins" configuration. An instance + * of each plugin will be created for every executor, including those created by dynamic allocation, + * before the executor starts running any tasks. + * + * The specific api exposed to the end users still considered to be very unstable. If implementors + * extend this base class, we will *hopefully* be able to keep compatability by providing dummy + * implementations for any methods added, but make no guarantees this will always be possible across + * all spark releases. + * + * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources + * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin + * could also intefere with task execution and make the executor fail in unexpected ways. + */ +@DeveloperApi +public class AbstractExecutorPlugin { +} diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b1856ff0f3247..df342aef842cb 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -130,6 +130,12 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) + Thread.currentThread().setContextClassLoader(replClassLoader) + conf.get(EXECUTOR_PLUGINS).foreach { classes => + Utils.loadExtensions(classOf[AbstractExecutorPlugin], classes, conf) + } + + // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) // SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 8fef2aa6863c5..da6f4c7c01167 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -567,4 +567,15 @@ package object config { .intConf .checkValue(v => v > 0, "The value should be a positive integer.") .createWithDefault(2000) + + private[spark] val EXECUTOR_PLUGINS = + ConfigBuilder("spark.executor.plugins") + .internal() + .doc("Comma-separated list of class names for \"plugins\" implementing " + + "org.apache.spark.AbstractExecutorPlugin. Plugins have the same privileges as any task " + + "in a spark executor. They can also interfere with task execution and fail in " + + "unexpected ways. So be sure to only use this for trusted plugins.") + .stringConf + .toSequence + .createOptional } From f8c99e30cc8879452702a9310bb5a901b2042a10 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 2 Aug 2018 12:08:55 -0500 Subject: [PATCH 2/5] switch to interface --- .../java/org/apache/spark/AbstractExecutorPlugin.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java b/core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java index af7e05ac40dad..c17ac1a3b09f3 100644 --- a/core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java +++ b/core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java @@ -25,15 +25,14 @@ * of each plugin will be created for every executor, including those created by dynamic allocation, * before the executor starts running any tasks. * - * The specific api exposed to the end users still considered to be very unstable. If implementors - * extend this base class, we will *hopefully* be able to keep compatability by providing dummy - * implementations for any methods added, but make no guarantees this will always be possible across - * all spark releases. + * The specific api exposed to the end users still considered to be very unstable. We will + * *hopefully* be able to keep compatability by providing default implementations for any methods + * added, but make no guarantees this will always be possible across all spark releases. * * Spark does nothing to verify the plugin is doing legitimate things, or to manage the resources * it uses. A plugin acquires the same privileges as the user running the task. A bad plugin * could also intefere with task execution and make the executor fail in unexpected ways. */ @DeveloperApi -public class AbstractExecutorPlugin { +interface ExecutorPlugin { } From 7d43c7777b0882e90ff2268016ff1f3b4b237149 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 2 Aug 2018 12:27:20 -0500 Subject: [PATCH 3/5] fix --- .../spark/{AbstractExecutorPlugin.java => ExecutorPlugin.java} | 0 core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename core/src/main/java/org/apache/spark/{AbstractExecutorPlugin.java => ExecutorPlugin.java} (100%) diff --git a/core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java b/core/src/main/java/org/apache/spark/ExecutorPlugin.java similarity index 100% rename from core/src/main/java/org/apache/spark/AbstractExecutorPlugin.java rename to core/src/main/java/org/apache/spark/ExecutorPlugin.java diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index df342aef842cb..a66712417f4b2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -132,7 +132,7 @@ private[spark] class Executor( Thread.currentThread().setContextClassLoader(replClassLoader) conf.get(EXECUTOR_PLUGINS).foreach { classes => - Utils.loadExtensions(classOf[AbstractExecutorPlugin], classes, conf) + Utils.loadExtensions(classOf[ExecutorPlugin], classes, conf) } From 3297195bb00ca3fbd15f57299bdf789b02cc04dd Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Thu, 2 Aug 2018 13:01:01 -0500 Subject: [PATCH 4/5] fix --- core/src/main/java/org/apache/spark/ExecutorPlugin.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/ExecutorPlugin.java index c17ac1a3b09f3..7e99ef844d5bc 100644 --- a/core/src/main/java/org/apache/spark/ExecutorPlugin.java +++ b/core/src/main/java/org/apache/spark/ExecutorPlugin.java @@ -34,5 +34,5 @@ * could also intefere with task execution and make the executor fail in unexpected ways. */ @DeveloperApi -interface ExecutorPlugin { +public interface ExecutorPlugin { } From 8fb739bb5e650ecb6f63a37c490405a3fbce2f9c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 6 Aug 2018 12:14:26 -0500 Subject: [PATCH 5/5] doc update --- .../main/scala/org/apache/spark/internal/config/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index da6f4c7c01167..add2b0c7eaa28 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -572,7 +572,7 @@ package object config { ConfigBuilder("spark.executor.plugins") .internal() .doc("Comma-separated list of class names for \"plugins\" implementing " + - "org.apache.spark.AbstractExecutorPlugin. Plugins have the same privileges as any task " + + "org.apache.spark.ExecutorPlugin. Plugins have the same privileges as any task " + "in a spark executor. They can also interfere with task execution and fail in " + "unexpected ways. So be sure to only use this for trusted plugins.") .stringConf