diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index 43c8326978440..e919a95f9e3aa 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -19,15 +19,11 @@ import java.io.File; import java.io.IOException; -import java.lang.reflect.Method; -import java.net.URL; -import java.net.URLClassLoader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.concurrent.atomic.AtomicLong; /** * Launcher for Spark applications. @@ -43,8 +39,6 @@ */ public class SparkLauncher extends AbstractLauncher { - private static final AtomicLong THREAD_ID = new AtomicLong(); - protected boolean verbose; protected String appName; protected String master; @@ -139,78 +133,6 @@ public SparkLauncher setVerbose(boolean verbose) { return this; } - /** - * Starts a new thread that will run the Spark application. - *

- * The application will run on a separate thread and use a separate, isolated class loader. - * No classes or resources from the current thread's class loader will be visible to the app. - *

- * This mode does not support certain configuration parameters, like configuring the amount of - * driver memory or custom driver command line options. If such configuration is detected, an - * exception will be thrown. - *

- * This is extremely experimental and should not be used in production environments. - *

- * NOTE: SparkSubmit uses system properties to propagate some configuration value to the app - * are run concurrently, they may affect each other's configurations. - *

- * NOTE: for users running JDK versions older than 8, this option can add a lot of overhead - * to the VM's perm gen. - * - * @param exceptionHandler Optional handler for handling exceptions in the app thread. - * @param daemon Whether to start a daemon thread. - * @return A non-daemon thread that will run the application using SparkSubmit. The thread will - * already be started. - */ - public Thread start(Thread.UncaughtExceptionHandler handler, boolean daemon) throws IOException { - // Do some sanity checking that incompatible driver options are not used, because they - // cannot be set in this mode. - Properties props = loadPropertiesFile(); - String extraClassPath = null; - if (isClientMode(props)) { - checkState( - find(DRIVER_EXTRA_JAVA_OPTIONS, conf, props) == null, - "Cannot set driver VM options when running in-process."); - checkState( - find(DRIVER_EXTRA_LIBRARY_PATH, conf, props) == null, - "Cannot set native library path when running in-process."); - checkState( - find(DRIVER_MEMORY, conf, props) == null, - "Cannot set driver memory when running in-process."); - extraClassPath = find(DRIVER_EXTRA_CLASSPATH, conf, props); - } - - List cp = buildClassPath(extraClassPath); - URL[] cpUrls = new URL[cp.size()]; - int idx = 0; - for (String entry : cp) { - cpUrls[idx++] = new File(entry).toURI().toURL(); - } - - URLClassLoader cl = new URLClassLoader(cpUrls, null); - - Thread appThread; - try { - Class sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit"); - Method main = sparkSubmit.getDeclaredMethod("main", String[].class); - List args = buildSparkSubmitArgs(); - appThread = new Thread(new SparkSubmitRunner(main, args)); - } catch (ClassNotFoundException cnfe) { - throw new IOException(cnfe); - } catch (NoSuchMethodException nsme) { - throw new IOException(nsme); - } - - appThread.setName("SparkLauncher-Submit-" + THREAD_ID.incrementAndGet()); - appThread.setContextClassLoader(cl); - if (handler != null) { - appThread.setUncaughtExceptionHandler(handler); - } - appThread.setDaemon(daemon); - appThread.start(); - return appThread; - } - /** * Launches a sub-process that will start the configured Spark application. * @@ -340,27 +262,4 @@ private boolean isClientMode(Properties userProps) { (deployMode == null && !userMaster.startsWith("yarn-")); } - private static class SparkSubmitRunner implements Runnable { - - private final Method main; - private final Object args; - - SparkSubmitRunner(Method main, List args) { - this.main = main; - this.args = args.toArray(new String[args.size()]); - } - - @Override - public void run() { - try { - main.invoke(null, args); - } catch (RuntimeException re) { - throw re; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - } - } diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index b43bcc5807248..fce2003069c01 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -68,39 +68,6 @@ public void testChildProcLauncher() throws Exception { assertEquals(0, app.waitFor()); } - @Test - public void testThreadAppLauncher() throws Exception { - // Do this to avoid overwriting the main test log file. - System.setProperty("test.name", "-testThreadAppLauncher"); - - SparkLauncher launcher = new SparkLauncher() - .setSparkHome(System.getProperty("spark.test.home")) - .setMaster("local") - .setAppResource("spark-internal") - .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) - .setMainClass(SparkLauncherTestApp.class.getName()) - .addAppArgs("thread"); - - printArgs(launcher.buildShellCommand()); - - Thread app = launcher.start(new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - String msg = "Uncaught exception in app."; - LOG.error(msg, e); - fail(msg); - } - }, true); - app.join(); - } - - @Test - public void testInProcessDriverArgValidator() throws Exception { - testInvalidDriverConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); - testInvalidDriverConf(SparkLauncher.DRIVER_MEMORY); - testInvalidDriverConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH); - } - private void testCmdBuilder(boolean isDriver) throws Exception { String deployMode = isDriver ? "client" : "cluster"; @@ -182,22 +149,6 @@ private void testCmdBuilder(boolean isDriver) throws Exception { assertEquals("foo", conf.get("spark.foo")); } - private void testInvalidDriverConf(String key) throws Exception { - try { - new SparkLauncher() - .setSparkHome(System.getProperty("spark.test.home")) - .setAppResource("spark-internal") - .setMainClass(SparkLauncherTestApp.class.getName()) - .addAppArgs("thread") - .setConf(key, "foo") - .start(null, true); - fail("Should have failed to start app."); - } catch (IllegalStateException e) { - assertTrue("Correct exception should be thrown.", - e.getMessage().indexOf("running in-process") > 0); - } - } - private String findArgValue(List cmd, String name) { for (int i = 0; i < cmd.size(); i++) { if (cmd.get(i).equals(name)) { @@ -252,13 +203,9 @@ private void printArgs(List cmd) { public static class SparkLauncherTestApp { public static void main(String[] args) throws Exception { - if (args[0].equals("proc")) { - assertEquals("bar", System.getProperty("foo")); - } else if (args[0].equals("arg")) { - assertEquals("newline=", args[1]); - } else { - assertEquals("thread", args[0]); - } + assertEquals(1, args.length); + assertEquals("proc", args[0]); + assertEquals("bar", System.getProperty("foo")); assertEquals("local", System.getProperty(SparkLauncher.SPARK_MASTER)); }