Skip to content

Commit

Permalink
support oracle datasource
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 committed Jan 17, 2022
1 parent 07ca437 commit 0612086
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object SourceCategory extends Enumeration {
val HBASE = Value("HBASE")
val MAXCOMPUTE = Value("MAXCOMPUTE")
val CLICKHOUSE = Value("CLICKHOUSE")
val ORACLE = Value("ORACLE")

val SOCKET = Value("SOCKET")
val KAFKA = Value("KAFKA")
Expand Down Expand Up @@ -264,3 +265,19 @@ case class ClickHouseConfigEntry(override val category: SourceCategory.Value,
s"ClickHouse source {url:$url, user:$user, passwd:$passwd, numPartition:$numPartition, table:$table, sentence:$sentence}"
}
}

/**
* OracleConfigEntry
*/
case class OracleConfigEntry(override val category: SourceCategory.Value,
url: String,
driver: String,
user: String,
passwd: String,
table: String,
override val sentence: String)
extends ServerDataSourceConfigEntry {
override def toString: String = {
s"Oracle source {url:$url, driver:$driver, user:$user, passwd:$passwd, table:$table, sentence:$sentence}"
}
}
5 changes: 5 additions & 0 deletions nebula-exchange_spark_2.2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
Expand All @@ -37,6 +38,7 @@ import com.vesoft.nebula.exchange.reader.{
MaxcomputeReader,
MySQLReader,
Neo4JReader,
OracleReader,
ORCReader,
ParquetReader,
PulsarReader
Expand Down Expand Up @@ -307,6 +309,11 @@ object Exchange {
val reader = new ClickhouseReader(session, clickhouseConfigEntry)
Some(reader.read())
}
case SourceCategory.ORACLE => {
val oracleConfig = config.asInstanceOf[OracleConfigEntry]
val reader = new OracleReader(session, oracleConfig)
Some(reader.read())
}
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,7 @@
package com.vesoft.nebula.exchange.reader

import com.google.common.collect.Maps
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
ServerDataSourceConfigEntry
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, ServerDataSourceConfigEntry}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
Expand All @@ -27,10 +18,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{
ClusterCountMapReduce,
PeerPressureVertexProgram
}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ClusterCountMapReduce, PeerPressureVertexProgram}
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
Expand Down Expand Up @@ -255,3 +243,31 @@ class ClickhouseReader(override val session: SparkSession,
df
}
}

/**
* Oracle reader
*/
class OracleReader(override val session: SparkSession, oracleConfig: OracleConfigEntry)
extends ServerBaseReader(session, oracleConfig.sentence) {
Class.forName(oracleConfig.driver)
override def read(): DataFrame = {
var df = session.read
.format("jdbc")
.option("url", oracleConfig.url)
.option("dbtable", oracleConfig.table)
.option("user", oracleConfig.user)
.option("password", oracleConfig.passwd)
.option("driver", oracleConfig.driver)
.load()

if (oracleConfig.sentence != null) {
val tableName = if (oracleConfig.table.contains(".")) {
oracleConfig.table.split("\\.")(1)
} else oracleConfig.table
df.createOrReplaceTempView(tableName)
df = session.sql(sentence)
}
df
}
}

5 changes: 5 additions & 0 deletions nebula-exchange_spark_2.4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,11 @@
<artifactId>emr-maxcompute_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
Expand All @@ -37,6 +38,7 @@ import com.vesoft.nebula.exchange.reader.{
MaxcomputeReader,
MySQLReader,
Neo4JReader,
OracleReader,
ORCReader,
ParquetReader,
PulsarReader
Expand Down Expand Up @@ -307,6 +309,11 @@ object Exchange {
val reader = new ClickhouseReader(session, clickhouseConfigEntry)
Some(reader.read())
}
case SourceCategory.ORACLE => {
val oracleConfig = config.asInstanceOf[OracleConfigEntry]
val reader = new OracleReader(session, oracleConfig)
Some(reader.read())
}
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
ServerDataSourceConfigEntry
}
import com.vesoft.exchange.common.utils.HDFSUtils
Expand Down Expand Up @@ -302,6 +303,7 @@ class ClickhouseReader(override val session: SparkSession,
clickHouseConfigEntry: ClickHouseConfigEntry)
extends ServerBaseReader(session, clickHouseConfigEntry.sentence) {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver")

override def read(): DataFrame = {
val df = session.read
.format("jdbc")
Expand All @@ -315,3 +317,30 @@ class ClickhouseReader(override val session: SparkSession,
df
}
}

/**
* Oracle reader
*/
class OracleReader(override val session: SparkSession, oracleConfig: OracleConfigEntry)
extends ServerBaseReader(session, oracleConfig.sentence) {
Class.forName(oracleConfig.driver)
override def read(): DataFrame = {
var df = session.read
.format("jdbc")
.option("url", oracleConfig.url)
.option("dbtable", oracleConfig.table)
.option("user", oracleConfig.user)
.option("password", oracleConfig.passwd)
.option("driver", oracleConfig.driver)
.load()

if (oracleConfig.sentence != null) {
val tableName = if (oracleConfig.table.contains(".")) {
oracleConfig.table.split("\\.")(1)
} else oracleConfig.table
df.createOrReplaceTempView(tableName)
df = session.sql(sentence)
}
df
}
}
6 changes: 5 additions & 1 deletion nebula-exchange_spark_3.0/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,11 @@
<artifactId>pulsar-spark-connector_2.12</artifactId>
<version>${pulsar.version}</version>
</dependency>

<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc11</artifactId>
<version>21.4.0.0</version>
</dependency>
<dependency>
<groupId>com.vesoft</groupId>
<artifactId>exchange-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.vesoft.exchange.common.config.{
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
OracleConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
Expand All @@ -37,6 +38,7 @@ import com.vesoft.nebula.exchange.reader.{
MaxcomputeReader,
MySQLReader,
Neo4JReader,
OracleReader,
ORCReader,
ParquetReader,
PulsarReader
Expand Down Expand Up @@ -307,6 +309,11 @@ object Exchange {
val reader = new ClickhouseReader(session, clickhouseConfigEntry)
Some(reader.read())
}
case SourceCategory.ORACLE => {
val oracleConfig = config.asInstanceOf[OracleConfigEntry]
val reader = new OracleReader(session, oracleConfig)
Some(reader.read())
}
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,7 @@

package com.vesoft.nebula.exchange.reader

import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
ServerDataSourceConfigEntry
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, OracleConfigEntry, ServerDataSourceConfigEntry}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
Expand Down Expand Up @@ -202,3 +193,30 @@ class ClickhouseReader(override val session: SparkSession,
df
}
}

/**
* Oracle reader
*/
class OracleReader(override val session: SparkSession, oracleConfig: OracleConfigEntry)
extends ServerBaseReader(session, oracleConfig.sentence) {
Class.forName(oracleConfig.driver)
override def read(): DataFrame = {
var df = session.read
.format("jdbc")
.option("url", oracleConfig.url)
.option("dbtable", oracleConfig.table)
.option("user", oracleConfig.user)
.option("password", oracleConfig.passwd)
.option("driver", oracleConfig.driver)
.load()

if (oracleConfig.sentence != null) {
val tableName = if (oracleConfig.table.contains(".")) {
oracleConfig.table.split("\\.")(1)
} else oracleConfig.table
df.createOrReplaceTempView(tableName)
df = session.sql(sentence)
}
df
}
}

0 comments on commit 0612086

Please sign in to comment.