From c617539320b001d53fab74cb8bf04675768fa37f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 13 Jan 2015 15:45:04 -0800 Subject: [PATCH] Review feedback round 1. --- bin/pyspark2.cmd | 1 - .../spark/deploy/worker/CommandUtils.scala | 2 +- .../spark/launcher/AbstractLauncher.java | 223 ++++-------------- .../apache/spark/launcher/LauncherCommon.java | 134 ++++++++++- .../java/org/apache/spark/launcher/Main.java | 10 +- .../spark/launcher/PySparkLauncher.java | 28 ++- .../spark/launcher/SparkClassLauncher.java | 7 +- .../apache/spark/launcher/SparkLauncher.java | 102 ++++---- .../launcher/SparkSubmitCliLauncher.java | 46 ++-- .../launcher/SparkSubmitOptionParser.java | 13 +- ...herSuite.java => LauncherCommonSuite.java} | 16 +- .../spark/launcher/SparkLauncherSuite.java | 32 +-- 12 files changed, 305 insertions(+), 309 deletions(-) rename launcher/src/test/java/org/apache/spark/launcher/{AbstractLauncherSuite.java => LauncherCommonSuite.java} (88%) diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index b4e6b0a43c470..ff5862d049f9f 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -35,5 +35,4 @@ set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.8.2.1-src.zip;%PYTHONPATH% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py -echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH% call %SPARK_HOME%\bin\spark-class2.cmd pyspark %* diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 4450ff4e8b485..d1947523ad3c8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -117,7 +117,7 @@ private class CommandLauncher(sparkHome: String, memory: Int, env: Map[String, S setSparkHome(sparkHome) override def buildLauncherCommand(): JList[String] = { - val cmd = createJavaCommand() + val cmd = buildJavaCommand() cmd.add("-cp") cmd.add(buildClassPath(null).mkString(File.pathSeparator)) cmd.add(s"-Xms${memory}M") diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java index 264ec82431897..995db4b726f42 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractLauncher.java @@ -25,12 +25,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Enumeration; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.jar.JarEntry; import java.util.jar.JarFile; import java.util.regex.Pattern; @@ -39,6 +38,7 @@ */ public abstract class AbstractLauncher extends LauncherCommon { + private static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf"; protected static final String DEFAULT_MEM = "512m"; protected String javaHome; @@ -93,6 +93,11 @@ public T setConf(String key, String value) { */ protected abstract List buildLauncherCommand() throws IOException; + /** + * Loads the configuration file for the application, if it exists. This is either the + * user-specified properties file, or the spark-defaults.conf file under the Spark configuration + * directory. + */ protected Properties loadPropertiesFile() throws IOException { Properties props = new Properties(); File propsFile; @@ -100,11 +105,7 @@ protected Properties loadPropertiesFile() throws IOException { propsFile = new File(propertiesFile); checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile); } else { - String confDir = getenv("SPARK_CONF_DIR"); - if (confDir == null) { - confDir = join(File.separator, getSparkHome(), "conf"); - } - propsFile = new File(confDir, "spark-defaults.conf"); + propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE); } if (propsFile.isFile()) { @@ -127,16 +128,16 @@ protected Properties loadPropertiesFile() throws IOException { } protected String getSparkHome() { - String path = first(sparkHome, getenv("SPARK_HOME")); + String path = firstNonEmpty(sparkHome, getenv("SPARK_HOME")); checkState(path != null, - "Spark home not found; set it explicitly or use the SPARK_HOME environment variable."); + "Spark home not found; set it explicitly or use the SPARK_HOME environment variable."); return path; } - protected List createJavaCommand() throws IOException { + protected List buildJavaCommand() throws IOException { List cmd = new ArrayList(); if (javaHome == null) { - cmd.add(join(File.separator, System.getProperty("java.home"), "..", "bin", "java")); + cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java")); } else { cmd.add(join(File.separator, javaHome, "bin", "java")); } @@ -186,12 +187,7 @@ protected List buildClassPath(String appClassPath) throws IOException { addToClassPath(cp, getenv("SPARK_CLASSPATH")); addToClassPath(cp, appClassPath); - String confDir = getenv("SPARK_CONF_DIR"); - if (!isEmpty(confDir)) { - addToClassPath(cp, confDir); - } else { - addToClassPath(cp, join(File.separator, getSparkHome(), "conf")); - } + addToClassPath(cp, getConfDir()); boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES")); boolean isTesting = "1".equals(getenv("SPARK_TESTING")); @@ -236,7 +232,7 @@ protected List buildClassPath(String appClassPath) throws IOException { } catch (IOException ioe) { if (ioe.getMessage().indexOf("invalid CEN header") > 0) { System.err.println( - "Loading Spark jar with '$JAR_CMD' failed.\n" + + "Loading Spark jar failed.\n" + "This is likely because Spark was compiled with Java 7 and run\n" + "with Java 6 (see SPARK-1703). Please use Java 7 to run Spark\n" + "or build Spark with Java 6."); @@ -279,6 +275,12 @@ protected List buildClassPath(String appClassPath) throws IOException { return cp; } + /** + * Adds entries to the classpath. + * + * @param cp List where to appended the new classpath entries. + * @param entries New classpath entries (separated by File.pathSeparator). + */ private void addToClassPath(List cp, String entries) { if (isEmpty(entries)) { return; @@ -317,7 +319,14 @@ protected String getScalaVersion() { throw new IllegalStateException("Should not reach here."); } + protected List prepareForOs(List cmd, String libPath) { + return prepareForOs(cmd, libPath, Collections.emptyMap()); + } + /** + * Prepare the command for execution under the current OS, setting the passed environment + * variables. + * * Which OS is running defines two things: * - the name of the environment variable used to define the lookup path for native libs * - how to execute the command in general. @@ -329,7 +338,8 @@ protected String getScalaVersion() { * * For Win32, see {@link #prepareForWindows(List,String)}. */ - protected List prepareForOs(List cmd, + protected List prepareForOs( + List cmd, String libPath, Map env) { @@ -365,128 +375,6 @@ protected List prepareForOs(List cmd, return newCmd; } - protected String shQuote(String s) { - StringBuilder quoted = new StringBuilder(); - boolean hasWhitespace = false; - for (int i = 0; i < s.length(); i++) { - if (Character.isWhitespace(s.codePointAt(i))) { - quoted.append('"'); - hasWhitespace = true; - break; - } - } - - for (int i = 0; i < s.length(); i++) { - int cp = s.codePointAt(i); - switch (cp) { - case '\'': - if (hasWhitespace) { - quoted.appendCodePoint(cp); - break; - } - case '"': - case '\\': - quoted.append('\\'); - // Fall through. - default: - if (Character.isWhitespace(cp)) { - hasWhitespace=true; - } - quoted.appendCodePoint(cp); - } - } - if (hasWhitespace) { - quoted.append('"'); - } - return quoted.toString(); - } - - // Visible for testing. - List parseOptionString(String s) { - List opts = new ArrayList(); - StringBuilder opt = new StringBuilder(); - boolean inOpt = false; - boolean inSingleQuote = false; - boolean inDoubleQuote = false; - boolean escapeNext = false; - boolean hasData = false; - - for (int i = 0; i < s.length(); i++) { - int c = s.codePointAt(i); - if (escapeNext) { - if (!inOpt) { - inOpt = true; - } - opt.appendCodePoint(c); - escapeNext = false; - } else if (inOpt) { - switch (c) { - case '\\': - if (inSingleQuote) { - opt.appendCodePoint(c); - } else { - escapeNext = true; - } - break; - case '\'': - if (inDoubleQuote) { - opt.appendCodePoint(c); - } else { - inSingleQuote = !inSingleQuote; - } - break; - case '"': - if (inSingleQuote) { - opt.appendCodePoint(c); - } else { - inDoubleQuote = !inDoubleQuote; - } - break; - default: - if (inSingleQuote || inDoubleQuote || !Character.isWhitespace(c)) { - opt.appendCodePoint(c); - } else { - finishOpt(opts, opt); - inOpt = false; - hasData = false; - } - } - } else { - switch (c) { - case '\'': - inSingleQuote = true; - inOpt = true; - hasData = true; - break; - case '"': - inDoubleQuote = true; - inOpt = true; - hasData = true; - break; - case '\\': - escapeNext = true; - break; - default: - if (!Character.isWhitespace(c)) { - inOpt = true; - opt.appendCodePoint(c); - } - } - } - } - - checkArgument(!inSingleQuote && !inDoubleQuote && !escapeNext, "Invalid option string: %s", s); - if (opt.length() > 0 || hasData) { - opts.add(opt.toString()); - } - return opts; - } - - private void finishOpt(List opts, StringBuilder opt) { - opts.add(opt.toString()); - opt.setLength(0); - } - private String findAssembly(String scalaVersion) { String sparkHome = getSparkHome(); File libdir; @@ -512,7 +400,12 @@ public boolean accept(File file) { } private String getenv(String key) { - return first(env != null ? env.get(key) : null, System.getenv(key)); + return firstNonEmpty(env != null ? env.get(key) : null, System.getenv(key)); + } + + private String getConfDir() { + String confDir = getenv("SPARK_CONF_DIR"); + return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf"); } /** @@ -526,8 +419,12 @@ private String getenv(String key) { * - Quote all arguments so that spaces are handled as expected. Quotes within arguments are * "double quoted" (which is batch for escaping a quote). This page has more details about * quoting and other batch script fun stuff: http://ss64.com/nt/syntax-esc.html + * + * The command is executed using "cmd /c" and formatted in a single line, since that's the + * easiest way to consume this from a batch script (see spark-class2.cmd). */ - private List prepareForWindows(List cmd, + private List prepareForWindows( + List cmd, String libPath, Map env) { StringBuilder cmdline = new StringBuilder("cmd /c \""); @@ -535,6 +432,9 @@ private List prepareForWindows(List cmd, cmdline.append("set PATH=%PATH%;").append(libPath).append(" &&"); } for (Map.Entry e : env.entrySet()) { + if (cmdline.length() > 0) { + cmdline.append(" "); + } cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue())); cmdline.append(" &&"); } @@ -542,27 +442,25 @@ private List prepareForWindows(List cmd, if (cmdline.length() > 0) { cmdline.append(" "); } - cmdline.append(quote(arg)); + cmdline.append(quoteForBatchScript(arg)); } cmdline.append("\""); return Arrays.asList(cmdline.toString()); } /** - * Quoting arguments that don't need quoting in Windows seems to cause weird issues. So only - * quote arguments when there is whitespace in them. + * Quote a command argument for a command to be run by a Windows batch script, if the argument + * needs quoting. Arguments only seem to need quotes in batch scripts if they have whitespace. */ - private boolean needsQuoting(String arg) { + private String quoteForBatchScript(String arg) { + boolean needsQuotes = false; for (int i = 0; i < arg.length(); i++) { if (Character.isWhitespace(arg.codePointAt(i))) { - return true; + needsQuotes = true; + break; } } - return false; - } - - private String quote(String arg) { - if (!needsQuoting(arg)) { + if (!needsQuotes) { return arg; } StringBuilder quoted = new StringBuilder(); @@ -578,23 +476,4 @@ private String quote(String arg) { return quoted.toString(); } - // Visible for testing. - String getLibPathEnvName() { - if (isWindows()) { - return "PATH"; - } - - String os = System.getProperty("os.name"); - if (os.startsWith("Mac OS X")) { - return "DYLD_LIBRARY_PATH"; - } else { - return "LD_LIBRARY_PATH"; - } - } - - protected boolean isWindows() { - String os = System.getProperty("os.name"); - return os.startsWith("Windows"); - } - } diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherCommon.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherCommon.java index 004c43019592c..a76bc6ebb5fcf 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherCommon.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherCommon.java @@ -17,6 +17,8 @@ package org.apache.spark.launcher; +import java.util.ArrayList; +import java.util.List; import java.util.Map; /** @@ -30,27 +32,29 @@ public class LauncherCommon { /** Configuration key for the driver memory. */ public static final String DRIVER_MEMORY = "spark.driver.memory"; /** Configuration key for the driver class path. */ - public static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath"; + public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath"; /** Configuration key for the driver VM options. */ - public static final String DRIVER_JAVA_OPTIONS = "spark.driver.extraJavaOptions"; + public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions"; /** Configuration key for the driver native library path. */ - public static final String DRIVER_LIBRARY_PATH = "spark.driver.extraLibraryPath"; + public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath"; /** Configuration key for the executor memory. */ public static final String EXECUTOR_MEMORY = "spark.executor.memory"; /** Configuration key for the executor class path. */ - public static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath"; + public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath"; /** Configuration key for the executor VM options. */ - public static final String EXECUTOR_JAVA_OPTIONS = "spark.executor.extraJavaOptions"; + public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions"; /** Configuration key for the executor native library path. */ - public static final String EXECUTOR_LIBRARY_PATH = "spark.executor.extraLibraryOptions"; + public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryOptions"; /** Configuration key for the number of executor CPU cores. */ public static final String EXECUTOR_CORES = "spark.executor.cores"; + /** Returns whether the given string is null or empty. */ protected static boolean isEmpty(String s) { return s == null || s.isEmpty(); } + /** Joins a list of strings using the given separator. */ protected static String join(String sep, String... elements) { StringBuilder sb = new StringBuilder(); for (String e : elements) { @@ -64,6 +68,7 @@ protected static String join(String sep, String... elements) { return sb.toString(); } + /** Joins a list of strings using the given separator. */ protected static String join(String sep, Iterable elements) { StringBuilder sb = new StringBuilder(); for (String e : elements) { @@ -77,6 +82,7 @@ protected static String join(String sep, Iterable elements) { return sb.toString(); } + /** Returns the first value mapped to the given key in the given maps. */ protected static String find(String key, Map... maps) { for (Map map : maps) { String value = (String) map.get(key); @@ -87,7 +93,8 @@ protected static String find(String key, Map... maps) { return null; } - protected static String first(String... candidates) { + /** Returns the first non-empty, non-null string in the given list. */ + protected static String firstNonEmpty(String... candidates) { for (String s : candidates) { if (!isEmpty(s)) { return s; @@ -96,18 +103,129 @@ protected static String first(String... candidates) { return null; } + /** Returns the name of the env variable that holds the native library path. */ + protected static String getLibPathEnvName() { + if (isWindows()) { + return "PATH"; + } + + String os = System.getProperty("os.name"); + if (os.startsWith("Mac OS X")) { + return "DYLD_LIBRARY_PATH"; + } else { + return "LD_LIBRARY_PATH"; + } + } + + /** Returns whether the OS is Windows. */ + protected static boolean isWindows() { + String os = System.getProperty("os.name"); + return os.startsWith("Windows"); + } + + /** + * Parse a string as if it were a list of arguments, in the way that a shell would. + * This tries to follow the way bash parses strings. For example: + * + * Input: "\"ab cd\" efgh 'i \" j'" + * Output: [ "ab cd", "efgh", "i \" j" ] + */ + protected static List parseOptionString(String s) { + List opts = new ArrayList(); + StringBuilder opt = new StringBuilder(); + boolean inOpt = false; + boolean inSingleQuote = false; + boolean inDoubleQuote = false; + boolean escapeNext = false; + boolean hasData = false; + + for (int i = 0; i < s.length(); i++) { + int c = s.codePointAt(i); + if (escapeNext) { + opt.appendCodePoint(c); + escapeNext = false; + } else if (inOpt) { + switch (c) { + case '\\': + if (inSingleQuote) { + opt.appendCodePoint(c); + } else { + escapeNext = true; + } + break; + case '\'': + if (inDoubleQuote) { + opt.appendCodePoint(c); + } else { + inSingleQuote = !inSingleQuote; + } + break; + case '"': + if (inSingleQuote) { + opt.appendCodePoint(c); + } else { + inDoubleQuote = !inDoubleQuote; + } + break; + default: + if (inSingleQuote || inDoubleQuote || !Character.isWhitespace(c)) { + opt.appendCodePoint(c); + } else { + opts.add(opt.toString()); + opt.setLength(0); + inOpt = false; + hasData = false; + } + } + } else { + switch (c) { + case '\'': + inSingleQuote = true; + inOpt = true; + hasData = true; + break; + case '"': + inDoubleQuote = true; + inOpt = true; + hasData = true; + break; + case '\\': + escapeNext = true; + inOpt = true; + hasData = true; + break; + default: + if (!Character.isWhitespace(c)) { + inOpt = true; + hasData = true; + opt.appendCodePoint(c); + } + } + } + } + + checkArgument(!inSingleQuote && !inDoubleQuote && !escapeNext, "Invalid option string: %s", s); + if (hasData) { + opts.add(opt.toString()); + } + return opts; + } + + /** Throws IllegalArgumentException if the given object is null. */ protected static void checkNotNull(Object o, String arg) { if (o == null) { throw new IllegalArgumentException(String.format("'%s' must not be null.", arg)); } } + /** Throws IllegalArgumentException with the given message if the check is false. */ protected static void checkArgument(boolean check, String msg, Object... args) { if (!check) { throw new IllegalArgumentException(String.format(msg, args)); } } + /** Throws IllegalStateException with the given message if the check is false. */ protected static void checkState(boolean check, String msg, Object... args) { if (!check) { throw new IllegalStateException(String.format(msg, args)); @@ -117,4 +235,4 @@ protected static void checkState(boolean check, String msg, Object... args) { // To avoid subclassing outside this package. LauncherCommon() { } -} \ No newline at end of file +} diff --git a/launcher/src/main/java/org/apache/spark/launcher/Main.java b/launcher/src/main/java/org/apache/spark/launcher/Main.java index 497c738614b68..ce1cdb3d36018 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/Main.java +++ b/launcher/src/main/java/org/apache/spark/launcher/Main.java @@ -17,13 +17,9 @@ package org.apache.spark.launcher; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; /** * Command line interface for the Spark launcher. Used internally by Spark scripts. @@ -50,20 +46,20 @@ public static void main(String[] argsArray) throws Exception { List args = new ArrayList(Arrays.asList(argsArray)); String className = args.remove(0); - boolean printLaunchCommand = false; + boolean printLaunchCommand; AbstractLauncher launcher; try { if (className.equals("org.apache.spark.deploy.SparkSubmit")) { launcher = new SparkSubmitCliLauncher(args); - printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); } else if (className.equals("pyspark")) { launcher = new PySparkLauncher(args); } else { launcher = new SparkClassLauncher(className, args); - printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); } + printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); } catch (IllegalArgumentException e) { launcher = new UsageLauncher(); + printLaunchCommand = false; } List cmd = launcher.buildLauncherCommand(); diff --git a/launcher/src/main/java/org/apache/spark/launcher/PySparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/PySparkLauncher.java index 6786f395990d6..2caa4fb2fb26c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/PySparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/PySparkLauncher.java @@ -44,7 +44,7 @@ protected List buildLauncherCommand() throws IOException { // For backwards compatibility, if a script is specified in // the pyspark command line, then run it using spark-submit. - if (!launcher.getArgs().isEmpty() && launcher.getArgs().get(0).endsWith(".py")) { + if (!launcher.getAppArgs().isEmpty() && launcher.getAppArgs().get(0).endsWith(".py")) { System.err.println( "WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0.\n" + "Use ./bin/spark-submit "); @@ -54,24 +54,24 @@ protected List buildLauncherCommand() throws IOException { // When launching the pyspark shell, the spark-submit arguments should be stored in the // PYSPARK_SUBMIT_ARGS env variable. The executable is the PYSPARK_DRIVER_PYTHON env variable // set by the pyspark script, followed by PYSPARK_DRIVER_PYTHON_OPTS. - checkArgument(launcher.getArgs().isEmpty(), - "pyspark does not support any application options."); + checkArgument(launcher.getAppArgs().isEmpty(), + "pyspark does not support any application options."); Properties props = loadPropertiesFile(); - String libPath = find(DRIVER_LIBRARY_PATH, conf, props); + String libPath = find(DRIVER_EXTRA_LIBRARY_PATH, conf, props); StringBuilder submitArgs = new StringBuilder(); for (String arg : launcher.getSparkArgs()) { if (submitArgs.length() > 0) { submitArgs.append(" "); } - submitArgs.append(shQuote(arg)); + submitArgs.append(quote(arg)); } for (String arg : launcher.getDriverArgs()) { if (submitArgs.length() > 0) { submitArgs.append(" "); } - submitArgs.append(shQuote(arg)); + submitArgs.append(quote(arg)); } Map env = new HashMap(); @@ -87,4 +87,20 @@ protected List buildLauncherCommand() throws IOException { return prepareForOs(pyargs, libPath, env); } + /** + * Quotes a string so that it can be used in a command string and be parsed back into a single + * argument by python's "shlex.split()" function. + */ + private String quote(String s) { + StringBuilder quoted = new StringBuilder().append('"'); + for (int i = 0; i < s.length(); i++) { + int cp = s.codePointAt(i); + if (cp == '"' || cp == '\\') { + quoted.appendCodePoint('\\'); + } + quoted.appendCodePoint(cp); + } + return quoted.append('"').toString(); + } + } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassLauncher.java index 2e42dbf5bda0c..e7e137b90c941 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassLauncher.java @@ -20,7 +20,6 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.regex.Pattern; @@ -91,19 +90,19 @@ protected List buildLauncherCommand() throws IOException { return buildSparkSubmitCommand(); } - List cmd = createJavaCommand(); + List cmd = buildJavaCommand(); for (String key : javaOptsKeys) { addOptionString(cmd, System.getenv(key)); } - String mem = first(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM); + String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM); cmd.add("-Xms" + mem); cmd.add("-Xmx" + mem); cmd.add("-cp"); cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath))); cmd.add(className); cmd.addAll(classArgs); - return prepareForOs(cmd, null, Collections.emptyMap()); + return prepareForOs(cmd, null); } private List buildSparkSubmitCommand() throws IOException { diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index 3b3405eed2efd..f41c007d54af8 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -23,7 +23,6 @@ import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -32,7 +31,7 @@ /** * Launcher for Spark applications. *

- * Use this class to start Spark applications programatically. The class uses a builder pattern + * Use this class to start Spark applications programmatically. The class uses a builder pattern * to allow clients to configure the Spark application and launch it as a child process. *

* There's also support for running the application on a separate thread, although that is to @@ -49,17 +48,17 @@ public class SparkLauncher extends AbstractLauncher { protected String appName; protected String master; protected String deployMode; - protected String userClass; - protected String userResource; + protected String mainClass; + protected String appResource; protected final List sparkArgs; - protected final List userArgs; + protected final List appArgs; protected final List jars; protected final List files; protected final List pyFiles; public SparkLauncher() { this.sparkArgs = new ArrayList(); - this.userArgs = new ArrayList(); + this.appArgs = new ArrayList(); this.jars = new ArrayList(); this.files = new ArrayList(); this.pyFiles = new ArrayList(); @@ -90,46 +89,46 @@ public SparkLauncher setDeployMode(String mode) { * Set the main application resource. This should be the location of a jar file for Scala/Java * applications, or a python script for PySpark applications. */ - public SparkLauncher setAppResource(String path) { - checkNotNull(path, "path"); - this.userResource = path; + public SparkLauncher setAppResource(String resource) { + checkNotNull(resource, "resource"); + this.appResource = resource; return this; } /** Sets the application class name for Java/Scala applications. */ - public SparkLauncher setClass(String userClass) { - checkNotNull(userClass, "userClass"); - this.userClass = userClass; + public SparkLauncher setMainClass(String mainClass) { + checkNotNull(mainClass, "mainClass"); + this.mainClass = mainClass; return this; } /** Adds command line arguments for the application. */ - public SparkLauncher addArgs(String... args) { + public SparkLauncher addAppArgs(String... args) { for (String arg : args) { checkNotNull(arg, "arg"); - userArgs.add(arg); + appArgs.add(arg); } return this; } /** Adds a jar file to be submitted with the application. */ - public SparkLauncher addJar(String path) { - checkNotNull(path, "path"); - jars.add(path); + public SparkLauncher addJar(String jar) { + checkNotNull(jar, "jar"); + jars.add(jar); return this; } /** Adds a file to be submitted with the application. */ - public SparkLauncher addFile(String path) { - checkNotNull(path, "path"); - files.add(path); + public SparkLauncher addFile(String file) { + checkNotNull(file, "file"); + files.add(file); return this; } - /** Adds a a python file / zip / egg to be submitted with the application. */ - public SparkLauncher addPyFile(String path) { - checkNotNull(path, "path"); - pyFiles.add(path); + /** Adds a python file / zip / egg to be submitted with the application. */ + public SparkLauncher addPyFile(String file) { + checkNotNull(file, "file"); + pyFiles.add(file); return this; } @@ -167,17 +166,17 @@ public Thread start(Thread.UncaughtExceptionHandler handler, boolean daemon) thr // cannot be set in this mode. Properties props = loadPropertiesFile(); String extraClassPath = null; - if (isRunningDriver(props)) { + if (isClientMode(props)) { checkState( - find(DRIVER_JAVA_OPTIONS, conf, props) == null, + find(DRIVER_EXTRA_JAVA_OPTIONS, conf, props) == null, "Cannot set driver VM options when running in-process."); checkState( - find(DRIVER_LIBRARY_PATH, conf, props) == null, + find(DRIVER_EXTRA_LIBRARY_PATH, conf, props) == null, "Cannot set native library path when running in-process."); checkState( find(DRIVER_MEMORY, conf, props) == null, "Cannot set driver memory when running in-process."); - extraClassPath = find(DRIVER_CLASSPATH, conf, props); + extraClassPath = find(DRIVER_EXTRA_CLASSPATH, conf, props); } List cp = buildClassPath(extraClassPath); @@ -276,23 +275,23 @@ List buildSparkSubmitArgs() { args.add(join(",", pyFiles)); } - if (userClass != null) { + if (mainClass != null) { args.add("--class"); - args.add(userClass); + args.add(mainClass); } args.addAll(sparkArgs); - if (userResource != null) { - args.add(userResource); + if (appResource != null) { + args.add(appResource); } - args.addAll(userArgs); + args.addAll(appArgs); return args; } @Override protected List buildLauncherCommand() throws IOException { - List cmd = createJavaCommand(); + List cmd = buildJavaCommand(); addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS")); addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS")); @@ -300,49 +299,40 @@ protected List buildLauncherCommand() throws IOException { // or just launching a cluster app. When running the driver, the JVM's argument will be // modified to cover the driver's configuration. Properties props = loadPropertiesFile(); - boolean isRunningDriver = isRunningDriver(props); + boolean isClientMode = isClientMode(props); - String extraClassPath = isRunningDriver ? find(DRIVER_CLASSPATH, conf, props) : null; + String extraClassPath = isClientMode ? find(DRIVER_EXTRA_CLASSPATH, conf, props) : null; cmd.add("-cp"); cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath))); String libPath = null; - if (isRunningDriver) { + if (isClientMode) { // Figuring out where the memory value come from is a little tricky due to precedence. // Precedence is observed in the following order: // - explicit configuration (setConf()), which also covers --driver-memory cli argument. - // - user properties, if properties file is explicitly set. + // - properties file. + // - SPARK_DRIVER_MEMORY env variable // - SPARK_MEM env variable - // - user properties, if using default file // - default value (512m) - String userMemSetting; - String defaultMemFromProps = null; - if (propertiesFile != null) { - userMemSetting = find(DRIVER_MEMORY, conf, props); - } else { - userMemSetting = conf.get(DRIVER_MEMORY); - defaultMemFromProps = props.getProperty(DRIVER_MEMORY); - } - - String memory = first(userMemSetting, System.getenv("SPARK_MEM"), defaultMemFromProps, - DEFAULT_MEM); + String memory = firstNonEmpty(find(DRIVER_MEMORY, conf, props), + System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); cmd.add("-Xms" + memory); cmd.add("-Xmx" + memory); - addOptionString(cmd, find(DRIVER_JAVA_OPTIONS, conf, props)); - libPath = find(DRIVER_LIBRARY_PATH, conf, props); + addOptionString(cmd, find(DRIVER_EXTRA_JAVA_OPTIONS, conf, props)); + libPath = find(DRIVER_EXTRA_LIBRARY_PATH, conf, props); } cmd.add("org.apache.spark.deploy.SparkSubmit"); cmd.addAll(buildSparkSubmitArgs()); - return prepareForOs(cmd, libPath, Collections.emptyMap()); + return prepareForOs(cmd, libPath); } - private boolean isRunningDriver(Properties userProps) { - String userMaster = first(master, (String) userProps.get(SPARK_MASTER)); + private boolean isClientMode(Properties userProps) { + String userMaster = firstNonEmpty(master, (String) userProps.get(SPARK_MASTER)); return userMaster == null || "client".equals(deployMode) || "yarn-client".equals(userMaster) || - (deployMode == null && userMaster != null && !userMaster.startsWith("yarn-")); + (deployMode == null && !userMaster.startsWith("yarn-")); } private static class SparkSubmitRunner implements Runnable { diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCliLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCliLauncher.java index b2d8077d90c8f..cf5b8df44f7e4 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCliLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCliLauncher.java @@ -31,21 +31,22 @@ *

* This launcher extends SparkLauncher to add command line parsing compatible with * SparkSubmit. It handles setting driver-side options and special parsing needed - * for the different shells. + * for the different specialClasses. *

* This class has also some special features to aid PySparkLauncher. */ public class SparkSubmitCliLauncher extends SparkLauncher { /** - * This map must match the class names for available shells, since this modifies the way - * command line parsing works. This maps the shell class name to the resource to use when - * calling spark-submit. + * This map must match the class names for available special classes, since this modifies the way + * command line parsing works. This maps the class name to the resource to use when calling + * spark-submit. */ - private static final Map shells = new HashMap(); + private static final Map specialClasses = new HashMap(); static { - shells.put("org.apache.spark.repl.Main", "spark-shell"); - shells.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver", "spark-internal"); + specialClasses.put("org.apache.spark.repl.Main", "spark-shell"); + specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver", + "spark-internal"); } private final List driverArgs; @@ -56,28 +57,27 @@ public class SparkSubmitCliLauncher extends SparkLauncher { } SparkSubmitCliLauncher(boolean hasMixedArguments, List args) { - boolean sparkSubmitOptionsEnded = false; this.driverArgs = new ArrayList(); this.hasMixedArguments = hasMixedArguments; new OptionParser().parse(args); } - /** Visible for PySparkLauncher. */ + // Visible for PySparkLauncher. String getAppResource() { - return userResource; + return appResource; } - /** Visible for PySparkLauncher. */ - List getArgs() { - return userArgs; + // Visible for PySparkLauncher. + List getAppArgs() { + return appArgs; } - /** Visible for PySparkLauncher. */ + // Visible for PySparkLauncher. List getSparkArgs() { return sparkArgs; } - /** Visible for PySparkLauncher. */ + // Visible for PySparkLauncher. List getDriverArgs() { return driverArgs; } @@ -108,26 +108,26 @@ protected boolean handle(String opt, String value) { driverArgs.add(opt); driverArgs.add(value); } else if (opt.equals(DRIVER_JAVA_OPTIONS)) { - setConf(LauncherCommon.DRIVER_JAVA_OPTIONS, value); + setConf(LauncherCommon.DRIVER_EXTRA_JAVA_OPTIONS, value); driverArgs.add(opt); driverArgs.add(value); } else if (opt.equals(DRIVER_LIBRARY_PATH)) { - setConf(LauncherCommon.DRIVER_LIBRARY_PATH, value); + setConf(LauncherCommon.DRIVER_EXTRA_LIBRARY_PATH, value); driverArgs.add(opt); driverArgs.add(value); } else if (opt.equals(DRIVER_CLASS_PATH)) { - setConf(LauncherCommon.DRIVER_CLASSPATH, value); + setConf(LauncherCommon.DRIVER_EXTRA_CLASSPATH, value); driverArgs.add(opt); driverArgs.add(value); } else if (opt.equals(CLASS)) { - // The shell launchers require some special command line handling, since they allow + // The special classes require some special command line handling, since they allow // mixing spark-submit arguments with arguments that should be propagated to the shell // itself. Note that for this to work, the "--class" argument must come before any // non-spark-submit arguments. - setClass(value); - if (shells.containsKey(value)) { + setMainClass(value); + if (specialClasses.containsKey(value)) { hasMixedArguments = true; - setAppResource(shells.get(value)); + setAppResource(specialClasses.get(value)); } } else { addSparkArgs(opt, value); @@ -141,7 +141,7 @@ protected boolean handleUnknown(String opt) { // In normal mode, any unrecognized parameter triggers the end of command line parsing. // The remaining params will be appended to the list of SparkSubmit arguments. if (hasMixedArguments) { - addArgs(opt); + addAppArgs(opt); return true; } else { addSparkArgs(opt); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java index eb322ae7c0666..5e744ccb0ee57 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java @@ -34,31 +34,34 @@ public abstract class SparkSubmitOptionParser { // The following constants define the "main" name for the available options. They're defined // to avoid copy & paste of the raw strings where they're needed. - protected static final String ARCHIVES = "--archives"; protected static final String CLASS = "--class"; protected static final String CONF = "--conf"; protected static final String DEPLOY_MODE = "--deploy-mode"; protected static final String DRIVER_CLASS_PATH = "--driver-class-path"; protected static final String DRIVER_CORES = "--driver-cores"; protected static final String DRIVER_JAVA_OPTIONS = "--driver-java-options"; - protected static final String DRIVER_LIBRARY_PATH = "--driver-library -path"; + protected static final String DRIVER_LIBRARY_PATH = "--driver-library-path"; protected static final String DRIVER_MEMORY = "--driver-memory"; - protected static final String EXECUTOR_CORES = "--executor-cores"; protected static final String EXECUTOR_MEMORY = "--executor-memory"; protected static final String FILES = "--files"; protected static final String JARS = "--jars"; protected static final String MASTER = "--master"; protected static final String NAME = "--name"; - protected static final String NUM_EXECUTORS = "--num-executors"; protected static final String PROPERTIES_FILE = "--properties-file"; protected static final String PY_FILES = "--py-files"; - protected static final String QUEUE = "--queue"; protected static final String TOTAL_EXECUTOR_CORES = "--total-executor-cores"; + // Options that do not take arguments. protected static final String HELP = "--help"; protected static final String SUPERVISE = "--supervise"; protected static final String VERBOSE = "--verbose"; + // YARN-only options. + protected static final String ARCHIVES = "--archives"; + protected static final String EXECUTOR_CORES = "--executor-cores"; + protected static final String QUEUE = "--queue"; + protected static final String NUM_EXECUTORS = "--num-executors"; + /** * This is the canonical list of spark-submit options. Each entry in the array contains the * different aliases for the same option; the first element of each entry is the "official" diff --git a/launcher/src/test/java/org/apache/spark/launcher/AbstractLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherCommonSuite.java similarity index 88% rename from launcher/src/test/java/org/apache/spark/launcher/AbstractLauncherSuite.java rename to launcher/src/test/java/org/apache/spark/launcher/LauncherCommonSuite.java index af18147e36cbd..49b088f0e6ae6 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/AbstractLauncherSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherCommonSuite.java @@ -24,14 +24,9 @@ import org.junit.Test; import static org.junit.Assert.*; -public class AbstractLauncherSuite { +import static org.apache.spark.launcher.LauncherCommon.*; - private AbstractLauncher launcher = new AbstractLauncher() { - @Override - protected List buildLauncherCommand() { - throw new UnsupportedOperationException(); - } - }; +public class LauncherCommonSuite { @Test public void testValidOptionStrings() { @@ -39,6 +34,7 @@ public void testValidOptionStrings() { testOpt("a 'b c' \"d\" e", Arrays.asList("a", "b c", "d", "e")); testOpt("a 'b\\\"c' \"'d'\" e", Arrays.asList("a", "b\\\"c", "'d'", "e")); testOpt("a 'b\"c' \"\\\"d\\\"\" e", Arrays.asList("a", "b\"c", "\"d\"", "e")); + testOpt(" a b c \\\\ ", Arrays.asList("a", "b", "c", "\\")); // Following tests ported from UtilsSuite.scala. testOpt("", new ArrayList()); @@ -66,7 +62,7 @@ public void testValidOptionStrings() { } @Test - public void testInalidOptionStrings() { + public void testInvalidOptionStrings() { testInvalidOpt("\\"); testInvalidOpt("\"abcde"); testInvalidOpt("'abcde"); @@ -74,12 +70,12 @@ public void testInalidOptionStrings() { private void testOpt(String opts, List expected) { assertEquals(String.format("test string failed to parse: [[ %s ]]", opts), - expected, launcher.parseOptionString(opts)); + expected, parseOptionString(opts)); } private void testInvalidOpt(String opts) { try { - launcher.parseOptionString(opts); + parseOptionString(opts); fail("Expected exception for invalid option string."); } catch (IllegalArgumentException e) { // pass. diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index df19d1cdfda25..c3e08bc95d48a 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -55,10 +55,10 @@ public void testChildProcLauncher() throws Exception { .setSparkHome(System.getProperty("spark.test.home")) .setMaster("local") .setAppResource("spark-internal") - .setConf(SparkLauncher.DRIVER_JAVA_OPTIONS, "-Dfoo=bar -Dtest.name=-testChildProcLauncher") - .setConf(SparkLauncher.DRIVER_CLASSPATH, System.getProperty("java.class.path")) - .setClass(SparkLauncherTestApp.class.getName()) - .addArgs("proc"); + .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Dfoo=bar -Dtest.name=-testChildProcLauncher") + .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) + .setMainClass(SparkLauncherTestApp.class.getName()) + .addAppArgs("proc"); printArgs(launcher.buildLauncherCommand()); @@ -94,9 +94,9 @@ public void testThreadAppLauncher() throws Exception { .setSparkHome(System.getProperty("spark.test.home")) .setMaster("local") .setAppResource("spark-internal") - .setConf(SparkLauncher.DRIVER_CLASSPATH, System.getProperty("java.class.path")) - .setClass(SparkLauncherTestApp.class.getName()) - .addArgs("thread"); + .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) + .setMainClass(SparkLauncherTestApp.class.getName()) + .addAppArgs("thread"); printArgs(launcher.buildLauncherCommand()); @@ -113,9 +113,9 @@ public void uncaughtException(Thread t, Throwable e) { @Test public void testInProcessDriverArgValidator() throws Exception { - testInvalidDriverConf(SparkLauncher.DRIVER_JAVA_OPTIONS); + testInvalidDriverConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); testInvalidDriverConf(SparkLauncher.DRIVER_MEMORY); - testInvalidDriverConf(SparkLauncher.DRIVER_LIBRARY_PATH); + testInvalidDriverConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH); } private void testCmdBuilder(boolean isDriver) throws Exception { @@ -127,12 +127,12 @@ private void testCmdBuilder(boolean isDriver) throws Exception { .setDeployMode(deployMode) .setAppResource("/foo") .setAppName("MyApp") - .setClass("my.Class") - .addArgs("foo", "bar") + .setMainClass("my.Class") + .addAppArgs("foo", "bar") .setConf(SparkLauncher.DRIVER_MEMORY, "1g") - .setConf(SparkLauncher.DRIVER_CLASSPATH, "/driver") - .setConf(SparkLauncher.DRIVER_JAVA_OPTIONS, "-Ddriver") - .setConf(SparkLauncher.DRIVER_LIBRARY_PATH, "/native") + .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver") + .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver") + .setConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native") .setConf("spark.foo", "foo"); List cmd = launcher.buildLauncherCommand(); @@ -203,8 +203,8 @@ private void testInvalidDriverConf(String key) throws Exception { new SparkLauncher() .setSparkHome(System.getProperty("spark.test.home")) .setAppResource("spark-internal") - .setClass(SparkLauncherTestApp.class.getName()) - .addArgs("thread") + .setMainClass(SparkLauncherTestApp.class.getName()) + .addAppArgs("thread") .setConf(key, "foo") .start(null, true); fail("Should have failed to start app.");