Skip to content

Commit

Permalink
Refactor taskmanager config for properties object
Browse files Browse the repository at this point in the history
  • Loading branch information
tobegit3hub committed Jul 17, 2023
1 parent 7b7a371 commit 5ddfeac
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Arrays;
import java.util.Properties;

import static com.sun.tools.doclint.Entity.prop;

/**
* The global configuration of TaskManager.
*
Expand All @@ -32,8 +34,67 @@
public class TaskManagerConfig {
private static Logger logger = LoggerFactory.getLogger(TaskManagerConfig.class);

public static String HOST;
public static int PORT;
private volatile static TaskManagerConfig instance;

private Properties props;

public Properties getProps() {
return props;
}

private static TaskManagerConfig getInstance() throws ConfigException {
if (instance == null) {
instance = new TaskManagerConfig();
}
return instance;
}

public static void validate() throws ConfigException {
getInstance();
}

protected static String getString(String key) {
try {
return getInstance().getProps().getProperty(key);
} catch (ConfigException e) {
e.printStackTrace();
return "";
}
}

protected static int getInt(String key) {
return Integer.parseInt(getString(key));
}

protected static long getLong(String key) {
return Long.parseLong(getString(key));
}

protected static boolean getBool(String key) {
return Boolean.parseBoolean(getString(key));
}


public static String getServerHost() {
return getString("server.host");
}

public static int getServerPort() {
return getInt("server.port");
}

public static int getServerWorkerThreads() {
return getInt("server.worker_threads");
}

public static int getServerIoThreads() {
return getInt("server.io_threads");
}

public static int getChannelKeepAliveTime() {
return getInt("server.channel_keep_alive_time");
}

public static int WORKER_THREAD;
public static int IO_THREAD;
public static int CHANNEL_KEEP_ALIVE_TIME;
Expand Down Expand Up @@ -67,25 +128,62 @@ public class TaskManagerConfig {
public static String K8S_HADOOP_CONFIGMAP_NAME;
public static String K8S_MOUNT_LOCAL_PATH;

private static volatile boolean isParsed = false;

public static void parse() throws ConfigException {
if (!isParsed) {
doParse();
isParsed = true;
public static void print() throws ConfigException {
StringBuilder builder = new StringBuilder();

Properties props = getInstance().getProps();
for (String key : props.stringPropertyNames()) {
String value = props.getProperty(key);
builder.append(key + " = " + value + "\n");
}

logger.info("TaskManager config: " + builder.toString());
}

public static void doParse() throws ConfigException {
Properties prop = new Properties();
public TaskManagerConfig() throws ConfigException {
props = new Properties();

// Load local properties file
try {
prop.load(TaskManagerConfig.class.getClassLoader().getResourceAsStream("taskmanager.properties"));
props.load(TaskManagerConfig.class.getClassLoader().getResourceAsStream("taskmanager.properties"));
} catch (IOException e) {
throw new ConfigException(String.format("Fail to load taskmanager.properties, message: ", e.getMessage()));
}

HOST = prop.getProperty("server.host", "0.0.0.0");
if (props.getProperty("server.host") == null) {
props.setProperty("server.host", "0.0.0.0");
}


if (props.getProperty("server.port") == null) {
props.setProperty("server.port", "9902");
}

if (props.getProperty("") == null) {
props.setProperty("", "");
}
if (props.getProperty("") == null) {
props.setProperty("", "");
}
if (props.getProperty("") == null) {
props.setProperty("", "");
}
if (props.getProperty("") == null) {
props.setProperty("", "");
}
if (props.getProperty("") == null) {
props.setProperty("", "");
}
if (props.getProperty("") == null) {
props.setProperty("", "");
}



PORT = Integer.parseInt(prop.getProperty("server.port", "9902"));


if (PORT < 1 || PORT > 65535) {
throw new ConfigException("server.port", "invalid port, should be in range of 1 through 65535");
}
Expand Down Expand Up @@ -268,12 +366,10 @@ public static void doParse() throws ConfigException {
}

public static boolean isK8s() throws ConfigException {
parse();
return SPARK_MASTER.equals("k8s") || SPARK_MASTER.equals("kubernetes");
}

public static boolean isYarnCluster() throws ConfigException {
parse();
return SPARK_MASTER.equals("yarn") || SPARK_MASTER.equals("yarn-cluster");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class TaskManagerServer {
* @throws ConfigException if config file does not exist or some configs are incorrect.
*/
public TaskManagerServer() throws ConfigException {
TaskManagerConfig.parse();
TaskManagerConfig.print();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class TaskManagerImpl implements TaskManagerInterface {
public TaskManagerImpl() throws InterruptedException, ConfigException {
jobResultSaver = new JobResultSaver();

TaskManagerConfig.parse();
TaskManagerConfig.validate();

initExternalFunction();
}
Expand Down Expand Up @@ -220,7 +220,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques
// HOST can't be 0.0.0.0 if distributed or spark is not local
confMap.put("spark.openmldb.savejobresult.http",
String.format("http://%s:%d/openmldb.taskmanager.TaskManagerServer/SaveJobResult",
TaskManagerConfig.HOST, TaskManagerConfig.PORT));
TaskManagerConfig.getServerHost(), TaskManagerConfig.getServerPort()));
// we can't get spark job id here, so we use JobResultSaver id, != spark job id
// if too much running jobs to save result, throw exception
int resultId = jobResultSaver.genResultId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public FailoverWatcher() throws IOException {
zkQuorum = TaskManagerConfig.ZK_CLUSTER;
sessionTimeout = TaskManagerConfig.ZK_SESSION_TIMEOUT;
connectRetryTimes = 3;
String serverHost = TaskManagerConfig.HOST;
int serverPort = TaskManagerConfig.PORT;
String serverHost = TaskManagerConfig.getServerHost();
int serverPort = TaskManagerConfig.getServerPort();
hostPort = new HostPort(serverHost, serverPort);

connectZooKeeper();
Expand Down

0 comments on commit 5ddfeac

Please sign in to comment.