From e02c06d13d7ee3f3831c6a4c180972c535cc1c6d Mon Sep 17 00:00:00 2001 From: Anqi Date: Wed, 19 Apr 2023 17:36:29 +0800 Subject: [PATCH] add overwrite option for migrate (#96) * add option for overwrite * add repositories & add some print info * fix for excluding the specific edges --- example/pom.xml | 11 ++++ .../examples/connector/Nebula2Nebula.scala | 50 +++++++++++++++---- 2 files changed, 50 insertions(+), 11 deletions(-) diff --git a/example/pom.xml b/example/pom.xml index ec54b5fb..7415c7a6 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -168,4 +168,15 @@ ${project.version} + + + + SparkPackagesRepo + https://repos.spark-packages.org + + + snapshots + https://oss.sonatype.org/content/repositories/snapshots/ + + diff --git a/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala b/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala index 2b0f0f8b..0437749f 100644 --- a/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala +++ b/example/src/main/scala/com/vesoft/nebula/examples/connector/Nebula2Nebula.scala @@ -82,6 +82,8 @@ object Nebula2Nebula { val passwdOption = new Option("passwd", "password", true, "password") passwdOption.setRequired(true) + val overwriteOption = + new Option("o", "overwrite", true, "overwrite the old data, default is true") // filter out some tags /edges val excludeTagsOption = new Option("excludeTags", "excludeTags", true, "filter out these tags, separate with `,`") @@ -102,6 +104,7 @@ object Nebula2Nebula { options.addOption(passwdOption) options.addOption(excludeTagsOption) options.addOption(excludeEdgesOption) + options.addOption(overwriteOption) var cli: CommandLine = null val cliParser: CommandLineParser = new DefaultParser @@ -134,6 +137,9 @@ object Nebula2Nebula { if (cli.hasOption("excludeEdges")) cli.getOptionValue("excludeEdges").split(",").toList else List() + val overwrite: Boolean = + if (cli.hasOption("o")) cli.getOptionValue("o").toBoolean else true + // common config val sourceConnectConfig = NebulaConnectionConfig @@ -156,14 +162,28 @@ object Nebula2Nebula { var (tags, edges, partitions) = getTagsAndEdges(metaHostAndPort(0), metaHostAndPort(1).toInt, sourceSpace) - if (excludeTags.nonEmpty) { - tags = tags.dropWhile(ele => excludeTags.contains(ele)) + val syncTags = new ListBuffer[String] + + println(s"source space tags: ${tags}") + println(s"exclude tags: ${excludeTags}") + for (i <- tags.indices) { + if (!excludeTags.contains(tags(i))) { + syncTags.append(tags(i)) + } } - if (excludeEdges.nonEmpty) { - edges = edges.dropWhile(ele => excludeEdges.contains(ele)) + println(s"tags need to sync: ${syncTags}") + + val syncEdges = new ListBuffer[String] + println(s"source space edges: ${edges}") + println(s"exclude edges: ${excludeEdges}") + for (i <- edges.indices) { + if (!excludeEdges.contains(edges(i))) { + syncEdges.append(edges(i)) + } } + println(s"edges need to sync: ${syncEdges}") - tags.foreach(tag => { + syncTags.foreach(tag => { syncTag(spark, sourceConnectConfig, sourceSpace, @@ -175,10 +195,11 @@ object Nebula2Nebula { tag, parallel, user, - passed) + passed, + overwrite) }) - edges.foreach(edge => { + syncEdges.foreach(edge => { syncEdge(spark, sourceConnectConfig, sourceSpace, @@ -190,7 +211,8 @@ object Nebula2Nebula { edge, parallel, user, - passed) + passed, + overwrite) }) } @@ -211,7 +233,7 @@ object Nebula2Nebula { } val partitions = metaClient.getPartsAlloc(space).size() - (tags.toList, edges.toList, partitions) + (tags.toList.distinct, edges.toList.distinct, partitions) } def syncTag(spark: SparkSession, @@ -225,7 +247,9 @@ object Nebula2Nebula { tag: String, writeParallel: Int, user: String, - passwd: String): Unit = { + passwd: String, + overwrite: Boolean): Unit = { + println(s" >>>>>> start to sync tag ${tag}") val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace(sourceSpace) @@ -248,6 +272,7 @@ object Nebula2Nebula { .withTag(tag) .withVidField("_vertexId") .withBatch(batch) + .withOverwrite(overwrite) .build() vertex.write.nebula(targetConfig, nebulaWriteVertexConfig).writeVertices() @@ -264,7 +289,9 @@ object Nebula2Nebula { edge: String, writeParallel: Int, user: String, - passwd: String): Unit = { + passwd: String, + overwrite: Boolean): Unit = { + println(s" >>>>>> start to sync edge ${edge}") val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig .builder() .withSpace(sourceSpace) @@ -289,6 +316,7 @@ object Nebula2Nebula { .withDstIdField("_dstId") .withRankField("_rank") .withBatch(batch) + .withOverwrite(overwrite) .build() edgeDf.write.nebula(targetConfig, nebulaWriteEdgeConfig).writeEdges() }