Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support custom rules for datasource #139

Merged
merged 2 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileSystem, Path}
import org.apache.log4j.Logger

import scala.collection.JavaConversions.asScalaBuffer
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -166,6 +167,12 @@ case class CaSignParam(caCrtFilePath: String, crtFilePath: String, keyFilePath:

case class SelfSignParam(crtFilePath: String, keyFilePath: String, password: String)

case class UdfConfigEntry(sep: String, oldColNames: List[String], newColName: String) {
override def toString(): String = {
s"sep:$sep, oldColNames: $oldColNames, newColName: $newColName"
}
}

/**
*
*/
Expand Down Expand Up @@ -431,6 +438,13 @@ object Configs {
val repartitionWithNebula = getOrElse(tagConfig, "repartitionWithNebula", true)
val ignoreIndex = getOrElse(tagConfig, "ignoreIndex", false)

val vertexUdf = if (tagConfig.hasPath("vertex.udf")) {
val sep = tagConfig.getString("vertex.udf.separator")
val cols: List[String] = tagConfig.getStringList("vertex.udf.oldColNames").toList
val newCol = tagConfig.getString("vertex.udf.newColName")
Some(UdfConfigEntry(sep, cols, newCol))
} else None

LOG.info(s"name ${tagName} batch ${batch}")
val entry = TagConfigEntry(
tagName,
Expand All @@ -445,7 +459,8 @@ object Configs {
checkPointPath,
repartitionWithNebula,
enableTagless,
ignoreIndex
ignoreIndex,
vertexUdf
)
LOG.info(s"Tag Config: ${entry}")
tags += entry
Expand Down Expand Up @@ -553,6 +568,20 @@ object Configs {
val repartitionWithNebula = getOrElse(edgeConfig, "repartitionWithNebula", false)
val ignoreIndex = getOrElse(edgeConfig, "ignoreIndex", false)

val srcUdf = if (edgeConfig.hasPath("source.udf")) {
val sep = edgeConfig.getString("source.udf.separator")
val cols: List[String] = edgeConfig.getStringList("source.udf.oldColNames").toList
val newCol = edgeConfig.getString("source.udf.newColName")
Some(UdfConfigEntry(sep, cols, newCol))
} else None

val dstUdf = if (edgeConfig.hasPath("target.udf")) {
val sep = edgeConfig.getString("target.udf.separator")
val cols: List[String] = edgeConfig.getStringList("target.udf.oldColNames").toList
val newCol = edgeConfig.getString("target.udf.newColName")
Some(UdfConfigEntry(sep, cols, newCol))
} else None

val entry = EdgeConfigEntry(
edgeName,
sourceConfig,
Expand All @@ -571,7 +600,9 @@ object Configs {
partition,
checkPointPath,
repartitionWithNebula,
ignoreIndex
ignoreIndex,
srcUdf,
dstUdf
)
LOG.info(s"Edge Config: ${entry}")
edges += entry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ case class TagConfigEntry(override val name: String,
override val checkPointPath: Option[String],
repartitionWithNebula: Boolean = true,
enableTagless: Boolean = false,
ignoreIndex: Boolean = false)
ignoreIndex: Boolean = false,
vertexUdf: Option[UdfConfigEntry] = None)
extends SchemaConfigEntry {
require(
name.trim.nonEmpty && vertexField.trim.nonEmpty
Expand All @@ -77,7 +78,9 @@ case class TagConfigEntry(override val name: String,
s"batch: $batch, " +
s"partition: $partition, " +
s"repartitionWithNebula: $repartitionWithNebula, " +
s"enableTagless: $enableTagless."
s"enableTagless: $enableTagless, " +
s"ignoreIndex: $ignoreIndex, " +
s"vertexUdf: $vertexUdf."
}
}

Expand Down Expand Up @@ -117,7 +120,9 @@ case class EdgeConfigEntry(override val name: String,
override val partition: Int,
override val checkPointPath: Option[String],
repartitionWithNebula: Boolean = false,
ignoreIndex: Boolean = false)
ignoreIndex: Boolean = false,
srcVertexUdf: Option[UdfConfigEntry] = None,
dstVertexUdf: Option[UdfConfigEntry] = None)
extends SchemaConfigEntry {
require(
name.trim.nonEmpty && sourceField.trim.nonEmpty && targetField.trim.nonEmpty
Expand All @@ -136,7 +141,10 @@ case class EdgeConfigEntry(override val name: String,
s"target field: $targetField, " +
s"target policy: $targetPolicy, " +
s"batch: $batch, " +
s"partition: $partition."
s"partition: $partition, " +
s"ignoreIndex: $ignoreIndex, " +
s"srcVertexUdf: $srcVertexUdf" +
s"dstVertexUdf: $dstVertexUdf."
} else {
s"Edge name: $name, " +
s"source: $dataSourceConfigEntry, " +
Expand All @@ -147,7 +155,10 @@ case class EdgeConfigEntry(override val name: String,
s"target field: $targetField, " +
s"target policy: $targetPolicy, " +
s"batch: $batch, " +
s"partition: $partition."
s"partition: $partition, " +
s"ignoreIndex: $ignoreIndex, " +
s"srcVertexUdf: $srcVertexUdf" +
s"dstVertexUdf: $dstVertexUdf."
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package com.vesoft.nebula.exchange

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import java.io.File

import com.vesoft.exchange.Argument
Expand All @@ -27,7 +27,8 @@ import com.vesoft.exchange.common.config.{
PostgreSQLSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
SourceCategory,
UdfConfigEntry
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
Expand All @@ -51,8 +52,12 @@ import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.exchange.common.utils.SparkValidate
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
import org.apache.spark.sql.functions.{col, concat_ws}
import org.apache.spark.sql.types.StringType
import org.apache.spark.{SparkConf, SparkEnv}

import scala.collection.mutable.ListBuffer

final case class TooManyErrorsException(private val message: String) extends Exception(message)

/**
Expand Down Expand Up @@ -142,8 +147,13 @@ object Exchange {
val fields = tagConfig.vertexField :: tagConfig.fields
val data = createDataSource(spark, tagConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
val df = if (tagConfig.vertexUdf.isDefined) {
dataUdf(data.get, tagConfig.vertexUdf.get)
} else {
data.get
}
df.cache()
val count = df.count()
val startTime = System.currentTimeMillis()
val batchSuccess =
spark.sparkContext.longAccumulator(s"batchSuccess.${tagConfig.name}")
Expand All @@ -152,7 +162,7 @@ object Exchange {

val processor = new VerticesProcessor(
spark,
repartition(data.get, tagConfig.partition, tagConfig.dataSourceConfigEntry.category),
repartition(df, tagConfig.partition, tagConfig.dataSourceConfigEntry.category),
tagConfig,
fieldKeys,
nebulaKeys,
Expand All @@ -161,7 +171,7 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
df.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(
s"import for tag ${tagConfig.name}: data total count: $count, total time: ${costTime}s")
Expand Down Expand Up @@ -195,15 +205,23 @@ object Exchange {
}
val data = createDataSource(spark, edgeConfig.dataSourceConfigEntry, fields)
if (data.isDefined && !c.dry) {
data.get.cache()
val count = data.get.count()
var df = data.get
if (edgeConfig.srcVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.srcVertexUdf.get)
}
if (edgeConfig.dstVertexUdf.isDefined) {
df = dataUdf(df, edgeConfig.dstVertexUdf.get)
}

df.cache()
val count = df.count()
val startTime = System.currentTimeMillis()
val batchSuccess = spark.sparkContext.longAccumulator(s"batchSuccess.${edgeConfig.name}")
val batchFailure = spark.sparkContext.longAccumulator(s"batchFailure.${edgeConfig.name}")

val processor = new EdgeProcessor(
spark,
repartition(data.get, edgeConfig.partition, edgeConfig.dataSourceConfigEntry.category),
repartition(df, edgeConfig.partition, edgeConfig.dataSourceConfigEntry.category),
edgeConfig,
fieldKeys,
nebulaKeys,
Expand All @@ -212,7 +230,7 @@ object Exchange {
batchFailure
)
processor.process()
data.get.unpersist()
df.unpersist()
val costTime = ((System.currentTimeMillis() - startTime) / 1000.0).formatted("%.2f")
LOG.info(
s"import for edge ${edgeConfig.name}: data total count: $count, total time: ${costTime}s")
Expand Down Expand Up @@ -363,4 +381,17 @@ object Exchange {
frame
}
}

private[this] def dataUdf(data: DataFrame, udfConfig: UdfConfigEntry): DataFrame = {
val oldCols = udfConfig.oldColNames
val sep = udfConfig.sep
val newCol = udfConfig.newColName
val originalFieldsNames = data.schema.fieldNames.toList
val finalColNames: ListBuffer[Column] = new ListBuffer[Column]
for (field <- originalFieldsNames) {
finalColNames.append(col(field))
}
finalColNames.append(concat_ws(sep, oldCols.map(c => col(c)): _*).cast(StringType).as(newCol))
data.select(finalColNames: _*)
}
}
18 changes: 17 additions & 1 deletion nebula-exchange_spark_2.4/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,16 @@
sink: client
}
path: hdfs tag path 0

fields: [parquet-field-0, parquet-field-1, parquet-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field:parquet-field-0
field:new-parquet-field
udf:{
separator:"_"
oldColNames:[parquet-field-0]
newColNames:[new-parquet-field]
}
#policy:hash
}
batch: 2000
Expand Down Expand Up @@ -367,10 +373,20 @@
nebula.fields: [nebula-field-0 nebula-field-1 nebula-field-2]
source: {
field:parquet-field-0
udf:{
separator:"_"
oldColNames:[parquet-field-0]
newColName:[new-parquet-field]
}
#policy:hash
}
target: {
field:parquet-field-1
udf:{
separator:"_"
oldColNames:[parquet-field-0]
newColName:[new-parquet-field]
}
#policy:hash
}
ranking: parquet-field-2
Expand Down
Loading