Skip to content

Commit

Permalink
add overwrite option for migrate (#96)
Browse files Browse the repository at this point in the history
* add option for overwrite

* add repositories & add some print info

* fix for excluding the specific edges
  • Loading branch information
Nicole00 authored Apr 19, 2023
1 parent 4280cf4 commit e02c06d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 11 deletions.
11 changes: 11 additions & 0 deletions example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,15 @@
<version>${project.version}</version>
</dependency>
</dependencies>

<repositories>
<repository>
<id>SparkPackagesRepo</id>
<url>https://repos.spark-packages.org</url>
</repository>
<repository>
<id>snapshots</id>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ object Nebula2Nebula {
val passwdOption = new Option("passwd", "password", true, "password")
passwdOption.setRequired(true)

val overwriteOption =
new Option("o", "overwrite", true, "overwrite the old data, default is true")
// filter out some tags /edges
val excludeTagsOption =
new Option("excludeTags", "excludeTags", true, "filter out these tags, separate with `,`")
Expand All @@ -102,6 +104,7 @@ object Nebula2Nebula {
options.addOption(passwdOption)
options.addOption(excludeTagsOption)
options.addOption(excludeEdgesOption)
options.addOption(overwriteOption)

var cli: CommandLine = null
val cliParser: CommandLineParser = new DefaultParser
Expand Down Expand Up @@ -134,6 +137,9 @@ object Nebula2Nebula {
if (cli.hasOption("excludeEdges")) cli.getOptionValue("excludeEdges").split(",").toList
else List()

val overwrite: Boolean =
if (cli.hasOption("o")) cli.getOptionValue("o").toBoolean else true

// common config
val sourceConnectConfig =
NebulaConnectionConfig
Expand All @@ -156,14 +162,28 @@ object Nebula2Nebula {
var (tags, edges, partitions) =
getTagsAndEdges(metaHostAndPort(0), metaHostAndPort(1).toInt, sourceSpace)

if (excludeTags.nonEmpty) {
tags = tags.dropWhile(ele => excludeTags.contains(ele))
val syncTags = new ListBuffer[String]

println(s"source space tags: ${tags}")
println(s"exclude tags: ${excludeTags}")
for (i <- tags.indices) {
if (!excludeTags.contains(tags(i))) {
syncTags.append(tags(i))
}
}
if (excludeEdges.nonEmpty) {
edges = edges.dropWhile(ele => excludeEdges.contains(ele))
println(s"tags need to sync: ${syncTags}")

val syncEdges = new ListBuffer[String]
println(s"source space edges: ${edges}")
println(s"exclude edges: ${excludeEdges}")
for (i <- edges.indices) {
if (!excludeEdges.contains(edges(i))) {
syncEdges.append(edges(i))
}
}
println(s"edges need to sync: ${syncEdges}")

tags.foreach(tag => {
syncTags.foreach(tag => {
syncTag(spark,
sourceConnectConfig,
sourceSpace,
Expand All @@ -175,10 +195,11 @@ object Nebula2Nebula {
tag,
parallel,
user,
passed)
passed,
overwrite)
})

edges.foreach(edge => {
syncEdges.foreach(edge => {
syncEdge(spark,
sourceConnectConfig,
sourceSpace,
Expand All @@ -190,7 +211,8 @@ object Nebula2Nebula {
edge,
parallel,
user,
passed)
passed,
overwrite)
})
}

Expand All @@ -211,7 +233,7 @@ object Nebula2Nebula {
}

val partitions = metaClient.getPartsAlloc(space).size()
(tags.toList, edges.toList, partitions)
(tags.toList.distinct, edges.toList.distinct, partitions)
}

def syncTag(spark: SparkSession,
Expand All @@ -225,7 +247,9 @@ object Nebula2Nebula {
tag: String,
writeParallel: Int,
user: String,
passwd: String): Unit = {
passwd: String,
overwrite: Boolean): Unit = {
println(s" >>>>>> start to sync tag ${tag}")
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace(sourceSpace)
Expand All @@ -248,6 +272,7 @@ object Nebula2Nebula {
.withTag(tag)
.withVidField("_vertexId")
.withBatch(batch)
.withOverwrite(overwrite)
.build()
vertex.write.nebula(targetConfig, nebulaWriteVertexConfig).writeVertices()

Expand All @@ -264,7 +289,9 @@ object Nebula2Nebula {
edge: String,
writeParallel: Int,
user: String,
passwd: String): Unit = {
passwd: String,
overwrite: Boolean): Unit = {
println(s" >>>>>> start to sync edge ${edge}")
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace(sourceSpace)
Expand All @@ -289,6 +316,7 @@ object Nebula2Nebula {
.withDstIdField("_dstId")
.withRankField("_rank")
.withBatch(batch)
.withOverwrite(overwrite)
.build()
edgeDf.write.nebula(targetConfig, nebulaWriteEdgeConfig).writeEdges()
}
Expand Down

0 comments on commit e02c06d

Please sign in to comment.