From 9da11d3f169832df3dbe4baeea7bd96c29c9e4e4 Mon Sep 17 00:00:00 2001 From: Anqi Date: Fri, 11 Feb 2022 11:12:08 +0800 Subject: [PATCH 1/4] update readme and workflow (#38) * update readme (#34) * fix workflow --- .github/workflows/release.yml | 4 ---- .github/workflows/snapshot.yml | 4 ---- README.md | 6 ++++-- README_CN.md | 6 ++++-- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 1ce246c9..7132a0a7 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -38,10 +38,6 @@ jobs: popd popd - # deploy nebula-spark-connector after nebula-exchange, the parent artifactId nebula-spark cannot be overwrite. - - name: enter the nebula-spark-connector directory - run: pushd nebula-spark-connector - - name: Deploy release to Maven uses: samuelmeuli/action-maven-publish@v1 with: diff --git a/.github/workflows/snapshot.yml b/.github/workflows/snapshot.yml index b2015b4b..2fee6328 100644 --- a/.github/workflows/snapshot.yml +++ b/.github/workflows/snapshot.yml @@ -40,10 +40,6 @@ jobs: popd popd - # deploy nebula-spark-connector after nebula-exchange, the parent artifactId nebula-spark cannot be overwrite. - - name: enter the nebula-spark-connector directory - run: pushd nebula-spark-connector - - name: Deploy SNAPSHOT to Sonatype uses: samuelmeuli/action-maven-publish@v1 with: diff --git a/README.md b/README.md index f6ac5695..aa4ef43d 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Nebula Spark Connector 2.0 +# Nebula Spark Connector [中文版](https://github.com/vesoft-inc/nebula-spark-connector/blob/master/README_CN.md) ## Introduction @@ -152,7 +152,9 @@ There are the version correspondence between Nebula Spark Connector and Nebula: | 2.1.0 | 2.0.0, 2.0.1 | | 2.5.0 | 2.5.0, 2.5.1 | | 2.5.1 | 2.5.0, 2.5.1 | -| 2.6.0 | 2.6.0 | +| 2.6.0 | 2.6.0, 2.6.1 | +| 2.6.1 | 2.6.0, 2.6.1 | +| 3.0.0 | 3.0.0 | | 3.0-SNAPSHOT | nightly | ## Performance diff --git a/README_CN.md b/README_CN.md index d5f00a93..209eadf2 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,4 +1,4 @@ -# 欢迎使用 Nebula Spark Connector 2.0 +# 欢迎使用 Nebula Spark Connector [English](https://github.com/vesoft-inc/nebula-spark-connector/blob/master/README.md) ## 介绍 @@ -153,7 +153,9 @@ Nebula Spark Connector 和 Nebula 的版本对应关系如下: | 2.1.0 | 2.0.0, 2.0.1 | | 2.5.0 | 2.5.0, 2.5.1 | | 2.5.1 | 2.5.0, 2.5.1 | -| 2.6.0 | 2.6.0 | +| 2.6.0 | 2.6.0, 2.6.1 | +| 2.6.1 | 2.6.0, 2.6.1 | +| 3.0.0 | 3.0.0 | | 3.0-SNAPSHOT | nightly | ## 性能 From ad6d8656649d8016dd41060acabbd1f0854ac258 Mon Sep 17 00:00:00 2001 From: Anqi Date: Mon, 14 Feb 2022 17:21:59 +0800 Subject: [PATCH 2/4] update version to 3.0.0 (#33) --- README.md | 6 +++--- README_CN.md | 6 +++--- example/pom.xml | 2 +- nebula-spark-connector/pom.xml | 4 ++-- .../src/test/resources/docker-compose.yaml | 20 +++++++++---------- pom.xml | 2 +- 6 files changed, 20 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index aa4ef43d..252e0a2f 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Nebula Spark Connector 2.0/3.0 only supports Nebula Graph 2.x/3.x. If you are us $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true ``` - After the packaging, you can see the newly generated nebula-spark-connector-3.0-SNAPSHOT.jar under the nebula-spark-connector/nebula-spark-connector/target/ directory. + After the packaging, you can see the newly generated nebula-spark-connector-3.0.0.jar under the nebula-spark-connector/nebula-spark-connector/target/ directory. ## New Features (Compared to Nebula Spark Connector 1.0) * Supports more connection configurations, such as timeout, connectionRetry, and executionRetry. @@ -33,7 +33,7 @@ Nebula Spark Connector 2.0/3.0 only supports Nebula Graph 2.x/3.x. If you are us com.vesoft nebula-spark-connector - 3.0-SNAPSHOT + 3.0.0 ``` @@ -154,7 +154,7 @@ There are the version correspondence between Nebula Spark Connector and Nebula: | 2.5.1 | 2.5.0, 2.5.1 | | 2.6.0 | 2.6.0, 2.6.1 | | 2.6.1 | 2.6.0, 2.6.1 | -| 3.0.0 | 3.0.0 | +| 3.0.0 | 3.0.0 | | 3.0-SNAPSHOT | nightly | ## Performance diff --git a/README_CN.md b/README_CN.md index 209eadf2..e97744af 100644 --- a/README_CN.md +++ b/README_CN.md @@ -14,7 +14,7 @@ Nebula Spark Connector 2.0/3.0 仅支持 Nebula Graph 2.x/3.x。如果您正在 $ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true ``` - 编译打包完成后,可以在 nebula-spark-connector/nebula-spark-connector/target/ 目录下看到 nebula-spark-connector-3.0-SNAPSHOT.jar 文件。 + 编译打包完成后,可以在 nebula-spark-connector/nebula-spark-connector/target/ 目录下看到 nebula-spark-connector-3.0.0.jar 文件。 ## 特性 @@ -32,7 +32,7 @@ Nebula Spark Connector 2.0/3.0 仅支持 Nebula Graph 2.x/3.x。如果您正在 com.vesoft nebula-spark-connector - 3.0-SNAPSHOT + 3.0.0 ``` @@ -155,7 +155,7 @@ Nebula Spark Connector 和 Nebula 的版本对应关系如下: | 2.5.1 | 2.5.0, 2.5.1 | | 2.6.0 | 2.6.0, 2.6.1 | | 2.6.1 | 2.6.0, 2.6.1 | -| 3.0.0 | 3.0.0 | +| 3.0.0 | 3.0.0 | | 3.0-SNAPSHOT | nightly | ## 性能 diff --git a/example/pom.xml b/example/pom.xml index 47807d1d..cc2f1dc5 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ nebula-spark com.vesoft - 3.0-SNAPSHOT + 3.0.0 ../pom.xml 4.0.0 diff --git a/nebula-spark-connector/pom.xml b/nebula-spark-connector/pom.xml index ace6b2e6..f3117d69 100644 --- a/nebula-spark-connector/pom.xml +++ b/nebula-spark-connector/pom.xml @@ -5,7 +5,7 @@ nebula-spark com.vesoft - 3.0-SNAPSHOT + 3.0.0 ../pom.xml 4.0.0 @@ -14,7 +14,7 @@ 2.4.4 - 3.0-SNAPSHOT + 3.0.0 1.8 1.8 3.2.3 diff --git a/nebula-spark-connector/src/test/resources/docker-compose.yaml b/nebula-spark-connector/src/test/resources/docker-compose.yaml index bce1badb..c2d70289 100644 --- a/nebula-spark-connector/src/test/resources/docker-compose.yaml +++ b/nebula-spark-connector/src/test/resources/docker-compose.yaml @@ -1,7 +1,7 @@ version: '3.4' services: metad0: - image: vesoft/nebula-metad:nightly + image: vesoft/nebula-metad:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -36,7 +36,7 @@ services: - SYS_PTRACE metad1: - image: vesoft/nebula-metad:nightly + image: vesoft/nebula-metad:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -71,7 +71,7 @@ services: - SYS_PTRACE metad2: - image: vesoft/nebula-metad:nightly + image: vesoft/nebula-metad:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -106,7 +106,7 @@ services: - SYS_PTRACE storaged0: - image: vesoft/nebula-storaged:nightly + image: vesoft/nebula-storaged:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -145,7 +145,7 @@ services: - SYS_PTRACE storaged1: - image: vesoft/nebula-storaged:nightly + image: vesoft/nebula-storaged:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -184,7 +184,7 @@ services: - SYS_PTRACE storaged2: - image: vesoft/nebula-storaged:nightly + image: vesoft/nebula-storaged:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -223,7 +223,7 @@ services: - SYS_PTRACE graphd0: - image: vesoft/nebula-graphd:nightly + image: vesoft/nebula-graphd:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -259,7 +259,7 @@ services: - SYS_PTRACE graphd1: - image: vesoft/nebula-graphd:nightly + image: vesoft/nebula-graphd:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -295,7 +295,7 @@ services: - SYS_PTRACE graphd2: - image: vesoft/nebula-graphd:nightly + image: vesoft/nebula-graphd:v3.0.0 environment: USER: root TZ: "${TZ}" @@ -331,7 +331,7 @@ services: - SYS_PTRACE console: - image: vesoft/nebula-console:nightly + image: vesoft/nebula-console:v3.0.0 entrypoint: "" command: - sh diff --git a/pom.xml b/pom.xml index 01d76c89..52621529 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.vesoft nebula-spark pom - 3.0-SNAPSHOT + 3.0.0 UTF-8 From 20f0a8deed313f6de4cefc05cfe22cbc1b98975d Mon Sep 17 00:00:00 2001 From: MeeCreeps Date: Fri, 2 Sep 2022 20:50:01 +0800 Subject: [PATCH 3/4] add edge reader by ngql --- .../nebula/connector/NebulaConfig.scala | 54 +++++-- .../nebula/connector/NebulaOptions.scala | 5 + .../com/vesoft/nebula/connector/package.scala | 38 +++++ .../NebulaNgqlEdgePartitionReader.scala | 152 ++++++++++++++++++ .../connector/reader/NebulaPartition.scala | 6 + .../connector/reader/NebulaSourceReader.scala | 13 ++ 6 files changed, 254 insertions(+), 14 deletions(-) create mode 100644 nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaNgqlEdgePartitionReader.scala diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala index c470c54e..0c7d0ad2 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaConfig.scala @@ -656,20 +656,35 @@ object WriteNebulaEdgeConfig { * you can set noColumn to true to read no vertex col, and you can set returnCols to read the specific cols, if the returnCols is empty, then read all the columns. * you can set partitionNum to define spark partition nums to read nebula graph. */ -class ReadNebulaConfig(space: String, - label: String, - returnCols: List[String], - noColumn: Boolean, - partitionNum: Int, - limit: Int) - extends Serializable { - def getSpace = space - def getLabel = label - def getReturnCols = returnCols - def getNoColumn = noColumn - def getPartitionNum = partitionNum - def getLimit = limit +class ReadNebulaConfig extends Serializable { + var getSpace: String = _ + var getLabel: String = _ + var getReturnCols: List[String] = _ + var getNoColumn: Boolean = _ + var getPartitionNum: Int = _ + var getLimit: Int = _ + var getNgql: String = _ // todo add filter + def this(space: String, label: String, returnCols: List[String], noColumn: Boolean, partitionNum: Int, limit: Int) = { + this() + this.getSpace = space + this.getLabel = label + this.getReturnCols = returnCols + this.getNoColumn = noColumn + this.getPartitionNum = partitionNum + this.getLimit = limit + } + + def this(space: String, label: String, returnCols: List[String], noColumn: Boolean, ngql: String, limit: Int)={ + this() + this.getNgql = ngql + this.getSpace = space + this.getLabel = label + this.getReturnCols = returnCols + this.getNoColumn = noColumn + this.getLimit = limit + this.getPartitionNum = 1 + } } /** @@ -685,6 +700,7 @@ object ReadNebulaConfig { var noColumn: Boolean = false var partitionNum: Int = 100 var limit: Int = 1000 + var ngql: String = _ def withSpace(space: String): ReadConfigBuilder = { this.space = space @@ -726,9 +742,19 @@ object ReadNebulaConfig { this } + def withNgql(ngql: String): ReadConfigBuilder = { + this.ngql = ngql + this + } + def build(): ReadNebulaConfig = { check() - new ReadNebulaConfig(space, label, returnCols.toList, noColumn, partitionNum, limit) + if(ngql!=null && !ngql.isEmpty){ + new ReadNebulaConfig(space,label,returnCols.toList,noColumn,ngql,limit) + } + else { + new ReadNebulaConfig(space, label, returnCols.toList, noColumn, partitionNum, limit) + } } private def check(): Unit = { diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala index 11051a68..c41153e7 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala @@ -109,11 +109,13 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])( var partitionNums: String = _ var noColumn: Boolean = _ var limit: Int = _ + var ngql: String = _ if (operaType == OperaType.READ) { returnCols = parameters(RETURN_COLS) noColumn = parameters.getOrElse(NO_COLUMN, false).toString.toBoolean partitionNums = parameters(PARTITION_NUMBER) limit = parameters.getOrElse(LIMIT, DEFAULT_LIMIT).toString.toInt + ngql = parameters.getOrElse(NGQL,EMPTY_STRING) } /** write parameters */ @@ -234,6 +236,9 @@ object NebulaOptions { val PARTITION_NUMBER: String = "partitionNumber" val LIMIT: String = "limit" + /** read by ngql **/ + val NGQL: String = "ngql" + /** write config */ val RATE_LIMIT: String = "rateLimit" val VID_POLICY: String = "vidPolicy" diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala index 59482660..809f79e8 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -172,6 +172,44 @@ package object connector { dfReader.load() } + /** + * Reading edges from Nebula Graph by ngql + * @return DataFrame + */ + def loadEdgesToDfByNgql(): DataFrame = { + assert(connectionConfig != null && readConfig != null, + "nebula config is not set, please call nebula() before loadEdgesToDF") + + val dfReader = reader + .format(classOf[NebulaDataSource].getName) + .option(NebulaOptions.TYPE, DataTypeEnum.EDGE.toString) + .option(NebulaOptions.SPACE_NAME, readConfig.getSpace) + .option(NebulaOptions.LABEL, readConfig.getLabel) + .option(NebulaOptions.RETURN_COLS, readConfig.getReturnCols.mkString(",")) + .option(NebulaOptions.NO_COLUMN, readConfig.getNoColumn) + .option(NebulaOptions.LIMIT, readConfig.getLimit) + .option(NebulaOptions.PARTITION_NUMBER, readConfig.getPartitionNum) + .option(NebulaOptions.NGQL, readConfig.getNgql) + .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) + .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) + .option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry) + .option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry) + .option(NebulaOptions.ENABLE_META_SSL, connectionConfig.getEnableMetaSSL) + .option(NebulaOptions.ENABLE_STORAGE_SSL, connectionConfig.getEnableStorageSSL) + + if (connectionConfig.getEnableStorageSSL || connectionConfig.getEnableMetaSSL) { + dfReader.option(NebulaOptions.SSL_SIGN_TYPE, connectionConfig.getSignType) + SSLSignType.withName(connectionConfig.getSignType) match { + case SSLSignType.CA => + dfReader.option(NebulaOptions.CA_SIGN_PARAM, connectionConfig.getCaSignParam) + case SSLSignType.SELF => + dfReader.option(NebulaOptions.SELF_SIGN_PARAM, connectionConfig.getSelfSignParam) + } + } + + dfReader.load() + } + /** * read nebula vertex edge to graphx's vertex * use hash() for String type vertex id. diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaNgqlEdgePartitionReader.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaNgqlEdgePartitionReader.scala new file mode 100644 index 00000000..3586238a --- /dev/null +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaNgqlEdgePartitionReader.scala @@ -0,0 +1,152 @@ +package com.vesoft.nebula.connector.reader + +import com.vesoft.nebula.Value +import com.vesoft.nebula.client.graph.data.{Relationship, ResultSet, ValueWrapper} +import com.vesoft.nebula.connector.NebulaUtils.NebulaValueGetter +import com.vesoft.nebula.connector.nebula.GraphProvider +import com.vesoft.nebula.connector.{NebulaOptions, NebulaUtils} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow +import org.apache.spark.sql.sources.v2.reader.InputPartitionReader +import org.apache.spark.sql.types.StructType +import org.slf4j.{Logger, LoggerFactory} + +import scala.collection.JavaConversions.asScalaBuffer +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** + * create reader by ngql + */ +class NebulaNgqlEdgePartitionReader extends InputPartitionReader[InternalRow] { + + private val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + private var nebulaOptions: NebulaOptions = _ + private var graphProvider: GraphProvider = _ + private var schema: StructType = _ + private var resultSet: ResultSet = _ + private var edgeIterator: Iterator[ListBuffer[ValueWrapper]] = _ + + def this(nebulaOptions: NebulaOptions, schema: StructType) { + this() + this.schema = schema + this.nebulaOptions = nebulaOptions + this.graphProvider = new GraphProvider( + nebulaOptions.getGraphAddress, + nebulaOptions.timeout, + nebulaOptions.enableGraphSSL, + nebulaOptions.sslSignType, + nebulaOptions.caSignParam, + nebulaOptions.selfSignParam + ) + // add exception when session build failed + graphProvider.switchSpace(nebulaOptions.user, nebulaOptions.passwd, nebulaOptions.spaceName) + resultSet = graphProvider.submit(nebulaOptions.ngql) + edgeIterator = query() + } + + def query(): Iterator[ListBuffer[ValueWrapper]] = { + val edges: ListBuffer[ListBuffer[ValueWrapper]] = new ListBuffer[ListBuffer[ValueWrapper]] + val properties = nebulaOptions.getReturnCols + for (i <- 0 until resultSet.rowsSize()) { + val rowValues = resultSet.rowValues(i).values() + for (j <- 0 until rowValues.size()) { + val value = rowValues.get(j) + val valueType = value.getValue.getSetField + if (valueType == Value.EVAL) { + val relationship = value.asRelationship() + if (checkLabel(relationship)) { + edges.append(convertToEdge(relationship, properties)) + } + } else if (valueType == Value.LVAL) { + val list: mutable.Buffer[ValueWrapper] = value.asList() + edges.appendAll( + list.toStream.filter(e => checkLabel(e.asRelationship())) + .map(e => convertToEdge(e.asRelationship(), properties)) + ) + } else { + LOG.error(s"Exception convert edge type ${valueType} ") + throw new RuntimeException(" convert value type failed"); + } + } + } + edges.iterator + } + + def checkLabel(relationship: Relationship): Boolean = { + this.nebulaOptions.label.equals(relationship.edgeName()) + } + + def convertToEdge(relationship: Relationship, properties: List[String]): ListBuffer[ValueWrapper] = { + val edge: ListBuffer[ValueWrapper] = new ListBuffer[ValueWrapper] + edge.append(relationship.srcId()) + edge.append(relationship.dstId()) + edge.append(new ValueWrapper(new Value(3, relationship.ranking()), "utf-8")) + if (properties == null || properties.isEmpty) + return edge + else { + for (i <- properties.indices) { + edge.append(relationship.properties().get(properties(i))) + } + } + edge + } + + + override def next(): Boolean = { + edgeIterator.hasNext + } + + override def get(): InternalRow = { + val getters: Array[NebulaValueGetter] = NebulaUtils.makeGetters(schema) + val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType)) + + val edge = edgeIterator.next(); + for (i <- getters.indices) { + val value: ValueWrapper = edge(i) + var resolved = false + if (value.isNull) { + mutableRow.setNullAt(i) + resolved = true + } + if (value.isString) { + getters(i).apply(value.asString(), mutableRow, i) + resolved = true + } + if (value.isDate) { + getters(i).apply(value.asDate(), mutableRow, i) + resolved = true + } + if (value.isTime) { + getters(i).apply(value.asTime(), mutableRow, i) + resolved = true + } + if (value.isDateTime) { + getters(i).apply(value.asDateTime(), mutableRow, i) + resolved = true + } + if (value.isLong) { + getters(i).apply(value.asLong(), mutableRow, i) + } + if (value.isBoolean) { + getters(i).apply(value.asBoolean(), mutableRow, i) + } + if (value.isDouble) { + getters(i).apply(value.asDouble(), mutableRow, i) + } + if (value.isGeography) { + getters(i).apply(value.asGeography(), mutableRow, i) + } + if (value.isDuration) { + getters(i).apply(value.asDuration(), mutableRow, i) + } + } + mutableRow + + } + + override def close(): Unit = { + graphProvider.close(); + } +} diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartition.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartition.scala index b702be89..2b48794b 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartition.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaPartition.scala @@ -21,3 +21,9 @@ class NebulaEdgePartition(index: Int, nebulaOptions: NebulaOptions, schema: Stru override def createPartitionReader(): InputPartitionReader[InternalRow] = new NebulaEdgePartitionReader(index, nebulaOptions, schema) } + +class NebulaNgqlEdgePartition(nebulaOptions: NebulaOptions, schema: StructType) + extends InputPartition[InternalRow] { + override def createPartitionReader(): InputPartitionReader[InternalRow] = + new NebulaNgqlEdgePartitionReader(nebulaOptions, schema) +} \ No newline at end of file diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala index f6da55e4..7bceff35 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/reader/NebulaSourceReader.scala @@ -131,3 +131,16 @@ class NebulaDataSourceEdgeReader(nebulaOptions: NebulaOptions) partitions.map(_.asInstanceOf[InputPartition[InternalRow]]).asJava } } + +/** + * DataSourceReader for Nebula Edge by ngql + */ +class NebulaDataSourceNgqlEdgeReader(nebulaOptions: NebulaOptions) + extends NebulaSourceReader(nebulaOptions) { + + override def planInputPartitions(): util.List[InputPartition[InternalRow]] = { + val list = new util.ArrayList[InputPartition[InternalRow]]() + list.add(new NebulaNgqlEdgePartition(nebulaOptions,getSchema)) + list + } +} \ No newline at end of file From f2f90434531f3eac01fc55a4eb8efd8e40686566 Mon Sep 17 00:00:00 2001 From: MeeCreeps Date: Fri, 9 Sep 2022 23:53:57 +0800 Subject: [PATCH 4/4] update option for ngql --- .../com/vesoft/nebula/connector/NebulaDataSource.scala | 5 +++-- .../scala/com/vesoft/nebula/connector/NebulaOptions.scala | 6 ++++++ .../main/scala/com/vesoft/nebula/connector/package.scala | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaDataSource.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaDataSource.scala index 307441de..3af93127 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaDataSource.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaDataSource.scala @@ -7,9 +7,8 @@ package com.vesoft.nebula.connector import java.util.Map.Entry import java.util.Optional - import com.vesoft.nebula.connector.exception.IllegalOptionException -import com.vesoft.nebula.connector.reader.{NebulaDataSourceEdgeReader, NebulaDataSourceVertexReader} +import com.vesoft.nebula.connector.reader.{NebulaDataSourceEdgeReader, NebulaDataSourceNgqlEdgeReader, NebulaDataSourceVertexReader} import com.vesoft.nebula.connector.writer.{NebulaDataSourceEdgeWriter, NebulaDataSourceVertexWriter} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -46,6 +45,8 @@ class NebulaDataSource if (DataTypeEnum.VERTEX == DataTypeEnum.withName(dataType)) { new NebulaDataSourceVertexReader(nebulaOptions) + } else if (nebulaOptions.ngql != null && nebulaOptions.ngql.nonEmpty) { + new NebulaDataSourceNgqlEdgeReader(nebulaOptions) } else { new NebulaDataSourceEdgeReader(nebulaOptions) } diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala index c41153e7..ce9326fe 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala @@ -116,6 +116,12 @@ class NebulaOptions(@transient val parameters: CaseInsensitiveMap[String])( partitionNums = parameters(PARTITION_NUMBER) limit = parameters.getOrElse(LIMIT, DEFAULT_LIMIT).toString.toInt ngql = parameters.getOrElse(NGQL,EMPTY_STRING) + ngql = parameters.getOrElse(NGQL,EMPTY_STRING) + if(ngql!=EMPTY_STRING){ + require(parameters.isDefinedAt(GRAPH_ADDRESS), + s"option $GRAPH_ADDRESS is required for ngql and can not be blank") + graphAddress = parameters(GRAPH_ADDRESS) + } } /** write parameters */ diff --git a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala index 809f79e8..ad572754 100644 --- a/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala +++ b/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/package.scala @@ -191,6 +191,7 @@ package object connector { .option(NebulaOptions.PARTITION_NUMBER, readConfig.getPartitionNum) .option(NebulaOptions.NGQL, readConfig.getNgql) .option(NebulaOptions.META_ADDRESS, connectionConfig.getMetaAddress) + .option(NebulaOptions.GRAPH_ADDRESS, connectionConfig.getGraphAddress) .option(NebulaOptions.TIMEOUT, connectionConfig.getTimeout) .option(NebulaOptions.CONNECTION_RETRY, connectionConfig.getConnectionRetry) .option(NebulaOptions.EXECUTION_RETRY, connectionConfig.getExecRetry)