Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support reading config file from hdfs #106

Merged
merged 2 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

package com.vesoft.exchange.common.config

import java.io.File
import java.io.{File, InputStreamReader}
import java.nio.file.Files

import com.google.common.net.HostAndPort
import com.typesafe.config.{Config, ConfigFactory}
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.KeyPolicy
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.log4j.Logger

import scala.collection.mutable
Expand Down Expand Up @@ -255,12 +257,21 @@ object Configs {
* @param configPath
* @return
*/
def parse(configPath: File): Configs = {
if (!Files.exists(configPath.toPath)) {
throw new IllegalArgumentException(s"${configPath} not exist")
def parse(configPath: String): Configs = {
var config: Config = null
if (configPath.startsWith("hdfs://")) {
val hadoopConfig: Configuration = new Configuration()
val fs: FileSystem = org.apache.hadoop.fs.FileSystem.get(hadoopConfig)
val file: FSDataInputStream = fs.open(new Path(configPath))
val reader = new InputStreamReader(file)
config = ConfigFactory.parseReader(reader)
} else {
if (!Files.exists(new File(configPath).toPath)) {
throw new IllegalArgumentException(s"${configPath} not exist")
}
config = ConfigFactory.parseFile(new File(configPath))
}

val config = ConfigFactory.parseFile(configPath)
val nebulaConfig = config.getConfig("nebula")
val addresses = nebulaConfig.getStringList("address.graph").asScala.toList
val metaAddresses = nebulaConfig.getStringList("address.meta").asScala.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ConfigsSuite {
assert(c.hive)
assert(c.directly)

val configs = Configs.parse(new File(c.config))
val configs = Configs.parse(c.config)
val dataBaseConfigEntry = configs.databaseConfig
val userConfig = configs.userConfig
val connectionConfig = configs.connectionConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(new File(c.config))
val configs = Configs.parse(c.config)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._

class EdgeProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var edgeConfig: EdgeConfigEntry = config.edgesConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class VerticesProcessorSuite {
private[this] lazy val LOG = Logger.getLogger(this.getClass)

val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var tagConfig: TagConfigEntry = config.tagsConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(new File(c.config))
val configs = Configs.parse(c.config)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.collection.JavaConverters._

class EdgeProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var edgeConfig: EdgeConfigEntry = config.edgesConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.collection.JavaConverters._

class VerticesProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var tagConfig: TagConfigEntry = config.tagsConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object Exchange {
sys.exit(-1)
}

val configs = Configs.parse(new File(c.config))
val configs = Configs.parse(c.config)
LOG.info(s"Config ${configs}")

val session = SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._

class EdgeProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var edgeConfig: EdgeConfigEntry = config.edgesConfig.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.JavaConverters._

class VerticesProcessorSuite {
val config: Configs =
Configs.parse(new File("../exchange-common/src/test/resources/process_application.conf"))
Configs.parse("../exchange-common/src/test/resources/process_application.conf")

var data: DataFrame = null
var tagConfig: TagConfigEntry = config.tagsConfig.head
Expand Down