Skip to content

Commit

Permalink
Remove "launch Spark in new thread" feature.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Jan 15, 2015
1 parent 7ed8859 commit f7cacff
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 157 deletions.
101 changes: 0 additions & 101 deletions launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;

/**
* Launcher for Spark applications.
Expand All @@ -43,8 +39,6 @@
*/
public class SparkLauncher extends AbstractLauncher<SparkLauncher> {

private static final AtomicLong THREAD_ID = new AtomicLong();

protected boolean verbose;
protected String appName;
protected String master;
Expand Down Expand Up @@ -139,78 +133,6 @@ public SparkLauncher setVerbose(boolean verbose) {
return this;
}

/**
* Starts a new thread that will run the Spark application.
* <p/>
* The application will run on a separate thread and use a separate, isolated class loader.
* No classes or resources from the current thread's class loader will be visible to the app.
* <p/>
* This mode does not support certain configuration parameters, like configuring the amount of
* driver memory or custom driver command line options. If such configuration is detected, an
* exception will be thrown.
* <p/>
* This is extremely experimental and should not be used in production environments.
* <p/>
* NOTE: SparkSubmit uses system properties to propagate some configuration value to the app
* are run concurrently, they may affect each other's configurations.
* <p/>
* NOTE: for users running JDK versions older than 8, this option can add a lot of overhead
* to the VM's perm gen.
*
* @param exceptionHandler Optional handler for handling exceptions in the app thread.
* @param daemon Whether to start a daemon thread.
* @return A non-daemon thread that will run the application using SparkSubmit. The thread will
* already be started.
*/
public Thread start(Thread.UncaughtExceptionHandler handler, boolean daemon) throws IOException {
// Do some sanity checking that incompatible driver options are not used, because they
// cannot be set in this mode.
Properties props = loadPropertiesFile();
String extraClassPath = null;
if (isClientMode(props)) {
checkState(
find(DRIVER_EXTRA_JAVA_OPTIONS, conf, props) == null,
"Cannot set driver VM options when running in-process.");
checkState(
find(DRIVER_EXTRA_LIBRARY_PATH, conf, props) == null,
"Cannot set native library path when running in-process.");
checkState(
find(DRIVER_MEMORY, conf, props) == null,
"Cannot set driver memory when running in-process.");
extraClassPath = find(DRIVER_EXTRA_CLASSPATH, conf, props);
}

List<String> cp = buildClassPath(extraClassPath);
URL[] cpUrls = new URL[cp.size()];
int idx = 0;
for (String entry : cp) {
cpUrls[idx++] = new File(entry).toURI().toURL();
}

URLClassLoader cl = new URLClassLoader(cpUrls, null);

Thread appThread;
try {
Class<?> sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
Method main = sparkSubmit.getDeclaredMethod("main", String[].class);
List<String> args = buildSparkSubmitArgs();
appThread = new Thread(new SparkSubmitRunner(main, args));
} catch (ClassNotFoundException cnfe) {
throw new IOException(cnfe);
} catch (NoSuchMethodException nsme) {
throw new IOException(nsme);
}

appThread.setName("SparkLauncher-Submit-" + THREAD_ID.incrementAndGet());
appThread.setContextClassLoader(cl);
if (handler != null) {
appThread.setUncaughtExceptionHandler(handler);
}
appThread.setDaemon(daemon);
appThread.start();
return appThread;
}

/**
* Launches a sub-process that will start the configured Spark application.
*
Expand Down Expand Up @@ -340,27 +262,4 @@ private boolean isClientMode(Properties userProps) {
(deployMode == null && !userMaster.startsWith("yarn-"));
}

private static class SparkSubmitRunner implements Runnable {

private final Method main;
private final Object args;

SparkSubmitRunner(Method main, List<String> args) {
this.main = main;
this.args = args.toArray(new String[args.size()]);
}

@Override
public void run() {
try {
main.invoke(null, args);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,39 +68,6 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

@Test
public void testThreadAppLauncher() throws Exception {
// Do this to avoid overwriting the main test log file.
System.setProperty("test.name", "-testThreadAppLauncher");

SparkLauncher launcher = new SparkLauncher()
.setSparkHome(System.getProperty("spark.test.home"))
.setMaster("local")
.setAppResource("spark-internal")
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("thread");

printArgs(launcher.buildShellCommand());

Thread app = launcher.start(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
String msg = "Uncaught exception in app.";
LOG.error(msg, e);
fail(msg);
}
}, true);
app.join();
}

@Test
public void testInProcessDriverArgValidator() throws Exception {
testInvalidDriverConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
testInvalidDriverConf(SparkLauncher.DRIVER_MEMORY);
testInvalidDriverConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH);
}

private void testCmdBuilder(boolean isDriver) throws Exception {
String deployMode = isDriver ? "client" : "cluster";

Expand Down Expand Up @@ -182,22 +149,6 @@ private void testCmdBuilder(boolean isDriver) throws Exception {
assertEquals("foo", conf.get("spark.foo"));
}

private void testInvalidDriverConf(String key) throws Exception {
try {
new SparkLauncher()
.setSparkHome(System.getProperty("spark.test.home"))
.setAppResource("spark-internal")
.setMainClass(SparkLauncherTestApp.class.getName())
.addAppArgs("thread")
.setConf(key, "foo")
.start(null, true);
fail("Should have failed to start app.");
} catch (IllegalStateException e) {
assertTrue("Correct exception should be thrown.",
e.getMessage().indexOf("running in-process") > 0);
}
}

private String findArgValue(List<String> cmd, String name) {
for (int i = 0; i < cmd.size(); i++) {
if (cmd.get(i).equals(name)) {
Expand Down Expand Up @@ -252,13 +203,9 @@ private void printArgs(List<String> cmd) {
public static class SparkLauncherTestApp {

public static void main(String[] args) throws Exception {
if (args[0].equals("proc")) {
assertEquals("bar", System.getProperty("foo"));
} else if (args[0].equals("arg")) {
assertEquals("newline=", args[1]);
} else {
assertEquals("thread", args[0]);
}
assertEquals(1, args.length);
assertEquals("proc", args[0]);
assertEquals("bar", System.getProperty("foo"));
assertEquals("local", System.getProperty(SparkLauncher.SPARK_MASTER));
}

Expand Down

0 comments on commit f7cacff

Please sign in to comment.