Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support oracle datasource #59

Merged
merged 3 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ object Configs {
case "MAXCOMPUTE" => SourceCategory.MAXCOMPUTE
case "CLICKHOUSE" => SourceCategory.CLICKHOUSE
case "POSTGRESQL" => SourceCategory.POSTGRESQL
case "ORACLE" => SourceCategory.ORACLE
case _ => throw new IllegalArgumentException(s"${category} not support")
}
}
Expand Down Expand Up @@ -671,6 +672,16 @@ object Configs {
config.getString("password"),
getOrElse(config, "sentence", null)
)
case SourceCategory.ORACLE =>
OracleConfigEntry(
SourceCategory.ORACLE,
config.getString("url"),
config.getString("driver"),
config.getString("user"),
config.getString("passwd"),
config.getString("table"),
getOrElse(config, "sentence", null)
)
case SourceCategory.KAFKA =>
val intervalSeconds =
if (config.hasPath("interval.seconds")) config.getInt("interval.seconds")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ object SourceCategory extends Enumeration {
val HBASE = Value("HBASE")
val MAXCOMPUTE = Value("MAXCOMPUTE")
val CLICKHOUSE = Value("CLICKHOUSE")
val POSTGRESQL = Value("POSTGRESQL")
val POSTGRESQL = Value("POSTGRESQL")
val ORACLE = Value("ORACLE")

val SOCKET = Value("SOCKET")
val KAFKA = Value("KAFKA")
Expand Down Expand Up @@ -159,27 +160,26 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
}

/**
* PostgreSQLSourceConfigEntry
*
* @param category
* @param host
* @param port
* @param database
* @param table
* @param user
* @param password
* @param sentence
*/
* PostgreSQLSourceConfigEntry
*
* @param category
* @param host
* @param port
* @param database
* @param table
* @param user
* @param password
* @param sentence
*/
case class PostgreSQLSourceConfigEntry(override val category: SourceCategory.Value,
host: String,
port: Int,
database: String,
table: String,
user: String,
password: String,
override val sentence: String
)
extends ServerDataSourceConfigEntry {
override val sentence: String)
extends ServerDataSourceConfigEntry {
require(
host.trim.length != 0 && port > 0 && database.trim.length > 0 && table.trim.length > 0 && user.trim.length > 0)

Expand Down Expand Up @@ -296,3 +296,19 @@ case class ClickHouseConfigEntry(override val category: SourceCategory.Value,
s"ClickHouse source {url:$url, user:$user, passwd:$passwd, numPartition:$numPartition, table:$table, sentence:$sentence}"
}
}

/**
* OracleConfigEntry
*/
case class OracleConfigEntry(override val category: SourceCategory.Value,
url: String,
driver: String,
user: String,
passwd: String,
table: String,
override val sentence: String)
extends ServerDataSourceConfigEntry {
override def toString: String = {
s"Oracle source {url:$url, driver:$driver, user:$user, passwd:$passwd, table:$table, sentence:$sentence}"
}
}
5 changes: 5 additions & 0 deletions nebula-exchange_spark_2.2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,41 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
OracleReader,
ORCReader,
PostgreSQLReader,
ParquetReader,
PulsarReader
}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
Expand Down Expand Up @@ -283,6 +316,11 @@ object Exchange {
val reader = new ClickhouseReader(session, clickhouseConfigEntry)
Some(reader.read())
}
case SourceCategory.ORACLE => {
val oracleConfig = config.asInstanceOf[OracleConfigEntry]
val reader = new OracleReader(session, oracleConfig)
Some(reader.read())
}
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PostgreSQLSourceConfigEntry,
ServerDataSourceConfigEntry
}
Expand Down Expand Up @@ -289,3 +290,30 @@ class ClickhouseReader(override val session: SparkSession,
df
}
}

/**
* Oracle reader
*/
class OracleReader(override val session: SparkSession, oracleConfig: OracleConfigEntry)
extends ServerBaseReader(session, oracleConfig.sentence) {
Class.forName(oracleConfig.driver)
override def read(): DataFrame = {
var df = session.read
.format("jdbc")
.option("url", oracleConfig.url)
.option("dbtable", oracleConfig.table)
.option("user", oracleConfig.user)
.option("password", oracleConfig.passwd)
.option("driver", oracleConfig.driver)
.load()

if (oracleConfig.sentence != null) {
val tableName = if (oracleConfig.table.contains(".")) {
oracleConfig.table.split("\\.")(1)
} else oracleConfig.table
df.createOrReplaceTempView(tableName)
df = session.sql(sentence)
}
df
}
}
5 changes: 5 additions & 0 deletions nebula-exchange_spark_2.4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@
<artifactId>emr-maxcompute_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
152 changes: 101 additions & 51 deletions nebula-exchange_spark_2.4/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -318,29 +318,52 @@
}

# PostgreSQL
{
name: tag9
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: postgre-field-0
# policy: "hash"
}
batch: 256
partition: 32
}
{
name: tag9
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: postgre-field-0
# policy: "hash"
}
batch: 256
partition: 32
}

# Oracle
{
name: tag10
type: {
source: oracle
sink: client
}
url:"jdbc:oracle:thin:@host:1521:db"
driver: "oracle.jdbc.driver.OracleDriver"
user: "root"
password: "nebula"
table: "db.table"
sentence: "select oracle-field0, oracle-field1, oracle-field2 from table"
fields: [oracle-field-0, oracle-field-1, oracle-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: oracle-field-0
# policy: "hash"
}
batch: 256
partition: 32
}
]

# Processing edges
Expand Down Expand Up @@ -591,33 +614,60 @@
}

# PostgreSQL
{
name: edge-name-8
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: {
field: mysql-field-0
# policy: "hash"
}
source: {
field: mysql-field-0
# policy: "hash"
}
ranking: postgre-field-1
batch: 256
partition: 32
}
{
name: edge-name-8
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: {
field: mysql-field-0
# policy: "hash"
}
target: {
field: mysql-field-0
# policy: "hash"
}
ranking: postgre-field-1
batch: 256
partition: 32
}

# Oracle
{
name: edge-name-9
type: {
source: oracle
sink: client
}
url:"jdbc:oracle:thin:@host:1521:db"
driver: "oracle.jdbc.driver.OracleDriver"
user: "root"
password: "nebula"
table: "db.table"
sentence: "select oracle-field0, oracle-field1, oracle-field2 from table"
fields: [oracle-field-0, oracle-field-1, oracle-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: {
field: oracle-field-0
# policy: "hash"
}
target: {
field: oracle-field-1
}
ranking: oracle-field-2
batch: 256
partition: 32
}
]
}
Loading