Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
硕塞 committed Feb 13, 2017
1 parent f4124ff commit 66f1c24
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 388 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
*/
public class JstormOnYarn {
private static final Log LOG = LogFactory.getLog(JstormOnYarn.class);
// Main class to invoke application master
private final String appMasterMainClass;
private JstormClientContext jstormClientContext = new JstormClientContext();

Expand Down Expand Up @@ -95,54 +94,20 @@ private void printUsage() {

/**
* Parse command line options
*
* @param args Parsed command line options
* @return Whether the init was successful to run the client
* @throws ParseException
*/
public boolean init(String[] args) throws ParseException {

CommandLine cliParser = new GnuParser().parse(jstormClientContext.opts, args);
if (args.length == 0) {
throw new IllegalArgumentException("No args specified for client to initialize");
}
if (cliParser.hasOption(JOYConstants.LOG_PROPERTIES)) {
String log4jPath = cliParser.getOptionValue(JOYConstants.LOG_PROPERTIES);
try {
Log4jPropertyHelper.updateLog4jConfiguration(JstormOnYarn.class, log4jPath);
} catch (Exception e) {
LOG.warn("Can not set up custom log4j properties. " + e);
}
}
if (cliParser.hasOption(JOYConstants.HELP)) {
printUsage();
return false;
}
if (cliParser.hasOption(JOYConstants.HELP)) {
jstormClientContext.debugFlag = true;
}
if (cliParser.hasOption(JOYConstants.KEEP_CONTAINERS_ACROSS_APPLICATION_ATTEMPTS)) {
jstormClientContext.keepContainers = true;
}
jstormClientContext.appName = cliParser.getOptionValue(JOYConstants.APP_NAME_KEY, JOYConstants.CLIIENT_CLASS);
jstormClientContext.amPriority = Integer.parseInt(cliParser.getOptionValue(JOYConstants.PRIORITY, JOYConstants.DEFAULT_PRIORITY));
jstormClientContext.amQueue = cliParser.getOptionValue(JOYConstants.QUEUE, JOYConstants.QUEUE_NAME);
jstormClientContext.amMemory = Integer.parseInt(cliParser.getOptionValue(JOYConstants.MASTER_MEMORY, JOYConstants.DEFAULT_MASTER_MEMORY));
jstormClientContext.amVCores = Integer.parseInt(cliParser.getOptionValue(JOYConstants.MASTER_VCORES, JOYConstants.DEFAULT_MASTER_VCORES));

if (jstormClientContext.amMemory < 0) {
throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
+ " Specified memory=" + jstormClientContext.amMemory);
}
if (jstormClientContext.amVCores < 0) {
throw new IllegalArgumentException("Invalid virtual cores specified for application master, exiting."
+ " Specified virtual cores=" + jstormClientContext.amVCores);
}

if (!cliParser.hasOption(JOYConstants.JAR)) {
throw new IllegalArgumentException("No jar file specified for application master");
}

jstormClientContext.appMasterJar = cliParser.getOptionValue(JOYConstants.JAR);
jstormClientContext.libJars = cliParser.getOptionValue(JOYConstants.LIB_JAR);
jstormClientContext.homeDir = cliParser.getOptionValue(JOYConstants.HOME_DIR);
Expand All @@ -151,116 +116,9 @@ public boolean init(String[] args) throws ParseException {
jstormClientContext.nameNodeHost = cliParser.getOptionValue(JOYConstants.NN_ADDRESS, JOYConstants.EMPTY);
jstormClientContext.deployPath = cliParser.getOptionValue(JOYConstants.DEPLOY_PATH, JOYConstants.EMPTY);
jstormClientContext.hadoopConfDir = cliParser.getOptionValue(JOYConstants.HADOOP_CONF_DIR, JOYConstants.EMPTY);

jstormClientContext.instanceName = cliParser.getOptionValue(JOYConstants.INSTANCE_NAME, JOYConstants.EMPTY);
LOG.info("Application client instance name:" + jstormClientContext.instanceName);

if (!jstormClientContext.rmHost.equals(JOYConstants.EMPTY)) {
jstormClientContext.conf.set(JOYConstants.RM_ADDRESS_KEY, jstormClientContext.rmHost, JOYConstants.YARN_CONF_MODE);
}
if (!jstormClientContext.nameNodeHost.equals(JOYConstants.EMPTY)) {
jstormClientContext.conf.set(JOYConstants.FS_DEFAULTFS_KEY, jstormClientContext.nameNodeHost);
}

LOG.info(JstormOnYarn.class.getProtectionDomain()
.getCodeSource().getLocation().getPath());
String jarPath = JstormOnYarn.class.getProtectionDomain()
.getCodeSource().getLocation().getPath();
if (jstormClientContext.confFile == null) {
JstormYarnUtils.getYarnConfFromJar(jarPath);
this.jstormClientContext.conf.addResource(JOYConstants.CONF_NAME);
} else {
Path jstormyarnConfPath = new Path(jstormClientContext.confFile);
LOG.info(jstormyarnConfPath.getName());
this.jstormClientContext.conf.addResource(jstormyarnConfPath);
}

if (!StringUtils.isBlank(jstormClientContext.hadoopConfDir)) {
try {
Collection<File> files = FileUtils.listFiles(new File(jstormClientContext.hadoopConfDir), new String[]{JOYConstants.XML}, true);
for (File file : files) {
LOG.info("adding hadoop conf file to conf: " + file.getAbsolutePath());
this.jstormClientContext.conf.addResource(file.getAbsolutePath());
}
} catch (Exception ex) {
LOG.error("failed to list hadoop conf dir: " + jstormClientContext.hadoopConfDir);
}
}

if (!cliParser.hasOption(JOYConstants.SHELL_SCRIPT)) {
String jarShellScriptPath = jarPath + JOYConstants.START_JSTORM_SHELL;
try {
InputStream stream = new FileInputStream(jarShellScriptPath);
FileOutputStream out = new FileOutputStream(JOYConstants.START_JSTORM_SHELL);
out.write(IOUtils.toByteArray(stream));
out.close();
jstormClientContext.shellScriptPath = JOYConstants.START_JSTORM_SHELL;
} catch (Exception e) {
throw new IllegalArgumentException(
"No shell script specified to be executed by application master to start nimbus and supervisor");
}
} else if (cliParser.hasOption(JOYConstants.SHELL_COMMAND) && cliParser.hasOption(JOYConstants.SHELL_SCRIPT)) {
throw new IllegalArgumentException("Can not specify shell_command option " +
"and shell_script option at the same time");
} else if (cliParser.hasOption(JOYConstants.SHELL_COMMAND)) {
jstormClientContext.shellCommand = cliParser.getOptionValue(JOYConstants.SHELL_COMMAND);
} else {
jstormClientContext.shellScriptPath = cliParser.getOptionValue(JOYConstants.SHELL_SCRIPT);
}
if (cliParser.hasOption(JOYConstants.SHELL_ARGS)) {
jstormClientContext.shellArgs = cliParser.getOptionValues(JOYConstants.SHELL_ARGS);
}
if (cliParser.hasOption(JOYConstants.SHELL_ENV)) {
String envs[] = cliParser.getOptionValues(JOYConstants.SHELL_ENV);
for (String env : envs) {
env = env.trim();
int index = env.indexOf(JOYConstants.EQUAL);
if (index == -1) {
jstormClientContext.shellEnv.put(env, JOYConstants.EMPTY);
continue;
}
String key = env.substring(0, index);
String val = JOYConstants.EMPTY;
if (index < (env.length() - 1)) {
val = env.substring(index + 1);
}
jstormClientContext.shellEnv.put(key, val);
}
}
jstormClientContext.shellCmdPriority = Integer.parseInt(cliParser.getOptionValue(JOYConstants.SHELL_CMD_PRIORITY, JOYConstants.SHELL_CMD_PRIORITY_DEFAULT_VALUE));
//set AM memory default to 1000mb
jstormClientContext.containerMemory = Integer.parseInt(cliParser.getOptionValue(JOYConstants.CONTAINER_MEMORY, JOYConstants.DEFAULT_CONTAINER_MEMORY));
jstormClientContext.containerVirtualCores = Integer.parseInt(cliParser.getOptionValue(JOYConstants.CONTAINER_VCORES, JOYConstants.DEFAULT_CONTAINER_VCORES));
jstormClientContext.numContainers = Integer.parseInt(cliParser.getOptionValue(JOYConstants.NUM_CONTAINERS, JOYConstants.DEFAULT_NUM_CONTAINER));

if (jstormClientContext.containerMemory < 0 || jstormClientContext.containerVirtualCores < 0 || jstormClientContext.numContainers < 1) {
throw new IllegalArgumentException("Invalid no. of containers or container memory/vcores specified,"
+ " exiting."
+ " Specified containerMemory=" + jstormClientContext.containerMemory
+ ", containerVirtualCores=" + jstormClientContext.containerVirtualCores
+ ", numContainer=" + jstormClientContext.numContainers);
}

jstormClientContext.nodeLabelExpression = cliParser.getOptionValue(JOYConstants.NODE_LABEL_EXPRESSION, null);
jstormClientContext.clientTimeout = Integer.parseInt(cliParser.getOptionValue(JOYConstants.TIMEOUT, JOYConstants.DEFAULT_CLIENT_TIME_OUT));

jstormClientContext.attemptFailuresValidityInterval =
Long.parseLong(cliParser.getOptionValue(
JOYConstants.ATTEMPT_FAILURES_VALIDITY_INTERVAL, JOYConstants.DEFAULT_ATTEMPT_FAILURES_VALIDITY_INTERVAL));

jstormClientContext.log4jPropFile = cliParser.getOptionValue(JOYConstants.LOG_PROPERTIES, JOYConstants.EMPTY);

// Get timeline domain options
if (cliParser.hasOption(JOYConstants.DOMAIN)) {
jstormClientContext.domainId = cliParser.getOptionValue(JOYConstants.DOMAIN);
jstormClientContext.toCreateDomain = cliParser.hasOption(JOYConstants.CREATE);
if (cliParser.hasOption(JOYConstants.VIEW_ACLS)) {
jstormClientContext.viewACLs = cliParser.getOptionValue(JOYConstants.VIEW_ACLS);
}
if (cliParser.hasOption(JOYConstants.MODIFY_ACLS)) {
jstormClientContext.modifyACLs = cliParser.getOptionValue(JOYConstants.MODIFY_ACLS);
}
}
JstormYarnUtils.checkAndSetOptions(cliParser, jstormClientContext);
return true;
}

Expand Down Expand Up @@ -349,7 +207,6 @@ public boolean run() throws IOException, YarnException {

LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(jstormClientContext.conf);
addToLocalResources(fs, jstormClientContext.appMasterJar, JOYConstants.appMasterJarPath, appId.toString(),
localResources, null);
Expand Down Expand Up @@ -533,9 +390,7 @@ public boolean run() throws IOException, YarnException {
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(jstormClientContext.amQueue);

// Submit the application to the applications manager
LOG.info("Submitting application to ASM");
LOG.info("conf instanceName : " + jstormClientContext.conf.get(JOYConstants.INSTANCE_NAME_KEY));

//check configuration
if (JstormYarnUtils.isUnset(jstormClientContext.conf.get(JOYConstants.INSTANCE_NAME_KEY))) {
Expand Down
Loading

0 comments on commit 66f1c24

Please sign in to comment.