From 4ac601eea6d49d53cb6e92211e1899322b308694 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Tue, 26 Oct 2021 19:45:28 +0800 Subject: [PATCH 1/8] rebase master --- .../nebula/connection/NebulaGraphConnectionProviderTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java index c50d41c..a4df688 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java @@ -1,8 +1,11 @@ +<<<<<<< HEAD /* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. */ +======= +>>>>>>> add test package org.apache.flink.connector.nebula.connection; import com.vesoft.nebula.client.graph.exception.AuthFailedException; From eb95610c125f4f04623b15f5d76d94ba280bf801 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Wed, 27 Oct 2021 11:25:42 +0800 Subject: [PATCH 2/8] refactor ssl name --- .../nebula/connection/NebulaGraphConnectionProviderTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java index a4df688..c50d41c 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaGraphConnectionProviderTest.java @@ -1,11 +1,8 @@ -<<<<<<< HEAD /* Copyright (c) 2021 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. */ -======= ->>>>>>> add test package org.apache.flink.connector.nebula.connection; import com.vesoft.nebula.client.graph.exception.AuthFailedException; From b6cb654ce641b6c4eca10ee69fdac7ae53f5a3f1 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Mon, 1 Nov 2021 09:41:18 +0800 Subject: [PATCH 3/8] fix NebulaPool's not serializable --- .../connector/nebula/sink/NebulaBatchOutputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java index bb2646e..998664e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java @@ -75,7 +75,7 @@ public void open(int i, int i1) throws IOException { } catch (UnknownHostException | NotValidConnectionException | AuthFailedException | ClientServerIncompatibleException | IOErrorException e) { LOG.error("failed to get graph session, ", e); - throw new IOException("get graph session error, ", e); + throw new IOException("get graph pool session, ", e); } ResultSet resultSet; try { From 480d62a125aedb8936026e5c6e08e751fae42e0f Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Mon, 1 Nov 2021 14:56:28 +0800 Subject: [PATCH 4/8] fix typo --- .../connector/nebula/sink/NebulaBatchOutputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java index 998664e..bb2646e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java @@ -75,7 +75,7 @@ public void open(int i, int i1) throws IOException { } catch (UnknownHostException | NotValidConnectionException | AuthFailedException | ClientServerIncompatibleException | IOErrorException e) { LOG.error("failed to get graph session, ", e); - throw new IOException("get graph pool session, ", e); + throw new IOException("get graph session error, ", e); } ResultSet resultSet; try { From 95d6a3974d390727e84bf0a17ca1eb65369f7acd Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 18 Nov 2021 11:50:33 +0800 Subject: [PATCH 5/8] support ssl for flink connector --- .../nebula/connection/CASignParams.java | 54 ++++++++++++++ .../connection/NebulaClientOptions.java | 70 +++++++++++-------- .../NebulaGraphConnectionProvider.java | 24 ++++--- .../NebulaMetaConnectionProvider.java | 32 ++++++++- .../NebulaStorageConnectionProvider.java | 32 ++++++++- .../nebula/connection/SelfSignParams.java | 54 ++++++++++++++ 6 files changed, 225 insertions(+), 41 deletions(-) create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/connection/CASignParams.java create mode 100644 connector/src/main/java/org.apache.flink/connector/nebula/connection/SelfSignParams.java diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/CASignParams.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/CASignParams.java new file mode 100644 index 0000000..ba5572e --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/CASignParams.java @@ -0,0 +1,54 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.connection; + +import java.io.Serializable; + +public class CASignParams implements Serializable { + + private String caCrtFilePath; + private String crtFilePath; + private String keyFilePath; + + public CASignParams(String caCrtFilePath, String crtFilePath, String keyFilePath) { + this.caCrtFilePath = caCrtFilePath; + this.crtFilePath = crtFilePath; + this.keyFilePath = keyFilePath; + } + + public String getCaCrtFilePath() { + return caCrtFilePath; + } + + public void setCaCrtFilePath(String caCrtFilePath) { + this.caCrtFilePath = caCrtFilePath; + } + + public String getCrtFilePath() { + return crtFilePath; + } + + public void setCrtFilePath(String crtFilePath) { + this.crtFilePath = crtFilePath; + } + + public String getKeyFilePath() { + return keyFilePath; + } + + public void setKeyFilePath(String keyFilePath) { + this.keyFilePath = keyFilePath; + } + + @Override + public String toString() { + return "CASSLSignParams{" + + "caCrtFilePath='" + caCrtFilePath + '\'' + + ", crtFilePath='" + crtFilePath + '\'' + + ", keyFilePath='" + keyFilePath + '\'' + + '}'; + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java index df3c072..6b44640 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java @@ -5,9 +5,7 @@ package org.apache.flink.connector.nebula.connection; -import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; -import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -34,18 +32,21 @@ public class NebulaClientOptions implements Serializable { private final boolean enableMetaSSL; + private final boolean enableStorageSSL; + private final SSLSighType sslSighType; - private final CASignedSSLParam caSignParam; + private final CASignParams caSSLSignParams; - private final SelfSignedSSLParam selfSignParam; + private final SelfSignParams selfSignParams; private NebulaClientOptions(String metaAddress, String graphAddress, String username, String password, int timeout, int connectRetry, boolean enableGraphSSL, boolean enableMetaSSL, - SSLSighType sslSighType, CASignedSSLParam caSignParam, - SelfSignedSSLParam selfSignParam) { + boolean enableStorageSSL, + SSLSighType sslSighType, CASignParams caSSLSignParams, + SelfSignParams selfSignParams) { this.metaAddress = metaAddress; this.graphAddress = graphAddress; this.username = username; @@ -54,9 +55,10 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user this.connectRetry = connectRetry; this.enableGraphSSL = enableGraphSSL; this.enableMetaSSL = enableMetaSSL; + this.enableStorageSSL = enableStorageSSL; this.sslSighType = sslSighType; - this.caSignParam = caSignParam; - this.selfSignParam = selfSignParam; + this.caSSLSignParams = caSSLSignParams; + this.selfSignParams = selfSignParams; } public List getMetaAddress() { @@ -96,16 +98,20 @@ public boolean isEnableMetaSSL() { return enableMetaSSL; } + public boolean isEnableStorageSSL() { + return enableStorageSSL; + } + public SSLSighType getSSLSighType() { return sslSighType; } - public CASignedSSLParam getCaSignParam() { - return caSignParam; + public CASignParams getCaSignParam() { + return caSSLSignParams; } - public SelfSignedSSLParam getSelfSignParam() { - return selfSignParam; + public SelfSignParams getSelfSignParam() { + return selfSignParams; } /** @@ -122,9 +128,10 @@ public static class NebulaClientOptionsBuilder { // ssl options private boolean enableGraphSSL = false; private boolean enableMetaSSL = false; + private boolean enableStorageSSL = false; private SSLSighType sslSighType = null; - private CASignedSSLParam caSignParam = null; - private SelfSignedSSLParam selfSignParam = null; + private CASignParams caSSLSignParams = null; + private SelfSignParams selfSignParams = null; public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) { this.metaAddress = metaAddress; @@ -166,6 +173,12 @@ public NebulaClientOptionsBuilder setEnableMetaSSL(boolean enableMetaSSL) { return this; } + public NebulaClientOptionsBuilder setEnableStorageSSL(boolean enableStorageSSL) { + this.enableStorageSSL = enableStorageSSL; + return this; + } + + public NebulaClientOptionsBuilder setSSLSignType(SSLSighType sslSighType) { this.sslSighType = sslSighType; return this; @@ -173,14 +186,13 @@ public NebulaClientOptionsBuilder setSSLSignType(SSLSighType sslSighType) { public NebulaClientOptionsBuilder setCaSignParam(String caCrtFilePath, String crtFilePath, String keyFilePath) { - this.caSignParam = new CASignedSSLParam(caCrtFilePath, crtFilePath, - keyFilePath); + this.caSSLSignParams = new CASignParams(caCrtFilePath, crtFilePath, keyFilePath); return this; } public NebulaClientOptionsBuilder setSelfSignParam(String crtFilePath, String keyFilePath, String password) { - this.selfSignParam = new SelfSignedSSLParam(crtFilePath, keyFilePath, password); + this.selfSignParams = new SelfSignParams(crtFilePath, keyFilePath, password); return this; } @@ -188,26 +200,27 @@ public NebulaClientOptions build() { if (metaAddress == null || metaAddress.trim().isEmpty()) { throw new IllegalArgumentException("meta address can not be empty."); } - if (enableMetaSSL || enableGraphSSL) { - // if meta is set to open ssl, then graph must be set to open ssl - if (enableMetaSSL && !enableGraphSSL) { + if (enableMetaSSL || enableGraphSSL || enableStorageSSL) { + // if storage is set to open ssl, then meta must be set to open ssl + if (enableStorageSSL && !enableMetaSSL) { throw new IllegalArgumentException( - "meta ssl is enable, graph ssl must be enable"); + "storage ssl is enabled, meta ssl must be enabled."); } + if (sslSighType == null) { - throw new IllegalArgumentException("ssl is enable, ssl sign type must not be " + throw new IllegalArgumentException("ssl is enabled, ssl sign type must not be " + "null"); } switch (sslSighType) { case CA: - if (caSignParam == null) { - throw new IllegalArgumentException("ssl is enable and sign type is " + if (caSSLSignParams == null) { + throw new IllegalArgumentException("ssl is enabled and sign type is " + "CA, caSignParam must not be null"); } break; case SELF: - if (selfSignParam == null) { - throw new IllegalArgumentException("ssl is enable and sign type is " + if (selfSignParams == null) { + throw new IllegalArgumentException("ssl is enabled and sign type is " + "CA, selfSignParam must not be null"); } break; @@ -226,9 +239,10 @@ public NebulaClientOptions build() { connectRetry, enableGraphSSL, enableMetaSSL, + enableStorageSSL, sslSighType, - caSignParam, - selfSignParam); + caSSLSignParams, + selfSignParams); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java index 830d24c..af9b99c 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java @@ -7,13 +7,11 @@ import com.vesoft.nebula.client.graph.NebulaPoolConfig; +import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; -import com.vesoft.nebula.client.graph.exception.AuthFailedException; -import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException; -import com.vesoft.nebula.client.graph.exception.IOErrorException; -import com.vesoft.nebula.client.graph.exception.NotValidConnectionException; +import com.vesoft.nebula.client.graph.data.SSLParam; +import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam; import com.vesoft.nebula.client.graph.net.NebulaPool; -import com.vesoft.nebula.client.graph.net.Session; import java.io.Serializable; import java.net.UnknownHostException; import java.util.ArrayList; @@ -51,12 +49,20 @@ public NebulaPool getNebulaPool() throws UnknownHostException { if (nebulaClientOptions.isEnableGraphSSL()) { poolConfig.setEnableSsl(true); switch (nebulaClientOptions.getSSLSighType()) { - case CA: - poolConfig.setSslParam(nebulaClientOptions.getCaSignParam()); + case CA: { + CASignParams caSignParams = nebulaClientOptions.getCaSignParam(); + SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(), + caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath()); + poolConfig.setSslParam(sslParam); break; - case SELF: - poolConfig.setSslParam(nebulaClientOptions.getSelfSignParam()); + } + case SELF: { + SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam(); + SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(), + selfSignParams.getKeyFilePath(), selfSignParams.getPassword()); + poolConfig.setSslParam(sslParam); break; + } default: throw new IllegalArgumentException("ssl sign type is not supported."); } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java index 1fb7499..09e3c38 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java @@ -6,7 +6,10 @@ package org.apache.flink.connector.nebula.connection; import com.facebook.thrift.TException; +import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.SSLParam; +import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam; import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException; import com.vesoft.nebula.client.meta.MetaClient; import com.vesoft.nebula.client.meta.exception.ExecuteFailedException; @@ -15,11 +18,9 @@ import com.vesoft.nebula.meta.Schema; import com.vesoft.nebula.meta.SpaceItem; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.flink.connector.nebula.utils.NebulaConstant; import org.apache.flink.connector.nebula.utils.VidTypeEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,32 @@ public NebulaMetaConnectionProvider(NebulaClientOptions nebulaClientOptions) { public MetaClient getMetaClient() throws TException, ClientServerIncompatibleException { List addresses = nebulaClientOptions.getMetaAddress(); - MetaClient metaClient = new MetaClient(addresses); + int timeout = nebulaClientOptions.getTimeout(); + int retry = nebulaClientOptions.getConnectRetry(); + MetaClient metaClient; + if (nebulaClientOptions.isEnableMetaSSL()) { + switch (nebulaClientOptions.getSSLSighType()) { + case CA: { + CASignParams caSignParams = nebulaClientOptions.getCaSignParam(); + SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(), + caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath()); + metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam); + break; + } + case SELF: { + SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam(); + SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(), + selfSignParams.getKeyFilePath(), selfSignParams.getPassword()); + metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam); + break; + } + default: + throw new IllegalArgumentException("ssl sign type is not supported."); + } + } else { + metaClient = new MetaClient(addresses, timeout, retry, retry); + } + metaClient.connect(); return metaClient; } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java index 9c7f908..ed12269 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java @@ -6,7 +6,10 @@ package org.apache.flink.connector.nebula.connection; +import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.SSLParam; +import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam; import com.vesoft.nebula.client.storage.StorageClient; import java.io.Serializable; import java.util.List; @@ -26,7 +29,34 @@ public NebulaStorageConnectionProvider() { public StorageClient getStorageClient() throws Exception { List addresses = nebulaClientOptions.getMetaAddress(); - StorageClient storageClient = new StorageClient(addresses); + int timeout = nebulaClientOptions.getTimeout(); + int retry = nebulaClientOptions.getConnectRetry(); + StorageClient storageClient; + if (nebulaClientOptions.isEnableStorageSSL()) { + switch (nebulaClientOptions.getSSLSighType()) { + case CA: { + CASignParams caSignParams = nebulaClientOptions.getCaSignParam(); + SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(), + caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath()); + storageClient = new StorageClient(addresses, timeout, retry, retry, true, + sslParam); + break; + } + case SELF: { + SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam(); + SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(), + selfSignParams.getKeyFilePath(), selfSignParams.getPassword()); + storageClient = new StorageClient(addresses, timeout, retry, retry, true, + sslParam); + break; + } + default: + throw new IllegalArgumentException("ssl sign type is not supported."); + } + } else { + storageClient = new StorageClient(addresses, timeout); + } + if (!storageClient.connect()) { throw new Exception("failed to connect storaged."); } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/SelfSignParams.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/SelfSignParams.java new file mode 100644 index 0000000..5e42ed5 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/SelfSignParams.java @@ -0,0 +1,54 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.connection; + +import java.io.Serializable; + +public class SelfSignParams implements Serializable { + + private String crtFilePath; + private String keyFilePath; + private String password; + + public SelfSignParams(String crtFilePath, String keyFilePath, String password) { + this.crtFilePath = crtFilePath; + this.keyFilePath = keyFilePath; + this.password = password; + } + + public String getCrtFilePath() { + return crtFilePath; + } + + public void setCrtFilePath(String crtFilePath) { + this.crtFilePath = crtFilePath; + } + + public String getKeyFilePath() { + return keyFilePath; + } + + public void setKeyFilePath(String keyFilePath) { + this.keyFilePath = keyFilePath; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + @Override + public String toString() { + return "SelfSSLSignParams{" + + "crtFilePath='" + crtFilePath + '\'' + + ", keyFilePath='" + keyFilePath + '\'' + + ", password='" + password + '\'' + + '}'; + } +} From a3343d7bf16ff409eb17bb0d229ffe81e4b89dd8 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 18 Nov 2021 11:50:51 +0800 Subject: [PATCH 6/8] add example for SSL use --- .../apache/flink/FlinkConnectorExample.java | 1 + .../flink/FlinkConnectorSourceExample.java | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/example/src/main/java/org/apache/flink/FlinkConnectorExample.java b/example/src/main/java/org/apache/flink/FlinkConnectorExample.java index 37d4463..418a62b 100644 --- a/example/src/main/java/org/apache/flink/FlinkConnectorExample.java +++ b/example/src/main/java/org/apache/flink/FlinkConnectorExample.java @@ -45,6 +45,7 @@ public class FlinkConnectorExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(5); DataStream> playerSource = constructVertexSourceData(env); sinkVertexData(env, playerSource); updateVertexData(env, playerSource); diff --git a/example/src/main/java/org/apache/flink/FlinkConnectorSourceExample.java b/example/src/main/java/org/apache/flink/FlinkConnectorSourceExample.java index 9c1ffb5..f0040b4 100644 --- a/example/src/main/java/org/apache/flink/FlinkConnectorSourceExample.java +++ b/example/src/main/java/org/apache/flink/FlinkConnectorSourceExample.java @@ -19,6 +19,7 @@ import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.SSLSighType; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; @@ -47,6 +48,8 @@ public class FlinkConnectorSourceExample { private static final Logger LOG = LoggerFactory.getLogger(FlinkConnectorSourceExample.class); private static NebulaStorageConnectionProvider storageConnectionProvider; + private static NebulaStorageConnectionProvider storageConnectionProviderCaSSL; + private static NebulaStorageConnectionProvider storageConnectionProviderSelfSSL; private static ExecutionOptions vertexExecutionOptions; private static ExecutionOptions edgeExecutionOptions; @@ -74,6 +77,30 @@ public static void initConfig() { storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions); + NebulaClientOptions nebulaClientOptionsWithCaSSL = + new NebulaClientOptions.NebulaClientOptionsBuilder() + .setEnableMetaSSL(true) + .setEnableStorageSSL(true) + .setSSLSignType(SSLSighType.CA) + .setCaSignParam("example/src/main/resources/ssl/casigned.pem", + "example/src/main/resources/ssl/casigned.crt", + "example/src/main/resources/ssl/casigned.key") + .build(); + storageConnectionProviderCaSSL = + new NebulaStorageConnectionProvider(nebulaClientOptionsWithCaSSL); + + NebulaClientOptions nebulaClientOptionsWithSelfSSL = + new NebulaClientOptions.NebulaClientOptionsBuilder() + .setEnableMetaSSL(true) + .setEnableStorageSSL(true) + .setSSLSignType(SSLSighType.SELF) + .setCaSignParam("example/src/main/resources/ssl/selfsigned.pem", + "example/src/main/resources/ssl/selfsigned.key", + "vesoft") + .build(); + storageConnectionProviderSelfSSL = + new NebulaStorageConnectionProvider(nebulaClientOptionsWithSelfSSL); + // read no property vertexExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSource") From d1509c33339b1f2b00c7c472ed64b7a433c2b94f Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 18 Nov 2021 15:02:17 +0800 Subject: [PATCH 7/8] fix test --- .../connector/nebula/connection/NebulaClientOptionsTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaClientOptionsTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaClientOptionsTest.java index 8275dca..7780659 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaClientOptionsTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaClientOptionsTest.java @@ -51,8 +51,8 @@ public void testIsEnableGraphSsl() { .NebulaClientOptionsBuilder() .setGraphAddress(null) .setMetaAddress("127.0.0.1:9559") - .setEnableGraphSSL(false) - .setEnableMetaSSL(true) + .setEnableMetaSSL(false) + .setEnableStorageSSL(true) .setSSLSignType(SSLSighType.CA) .setCaSignParam("caCrtFile", "crtFile", "keyFile") .build(); From 7d355b4d8e746ed543a790c6a15c20a14b15d9c9 Mon Sep 17 00:00:00 2001 From: Nicole00 Date: Thu, 18 Nov 2021 17:43:28 +0800 Subject: [PATCH 8/8] fix var name --- .../nebula/connection/NebulaClientOptions.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java index 6b44640..3fa9d3f 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java @@ -36,7 +36,7 @@ public class NebulaClientOptions implements Serializable { private final SSLSighType sslSighType; - private final CASignParams caSSLSignParams; + private final CASignParams caSignParams; private final SelfSignParams selfSignParams; @@ -45,7 +45,7 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user String password, int timeout, int connectRetry, boolean enableGraphSSL, boolean enableMetaSSL, boolean enableStorageSSL, - SSLSighType sslSighType, CASignParams caSSLSignParams, + SSLSighType sslSighType, CASignParams caSignParams, SelfSignParams selfSignParams) { this.metaAddress = metaAddress; this.graphAddress = graphAddress; @@ -57,7 +57,7 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user this.enableMetaSSL = enableMetaSSL; this.enableStorageSSL = enableStorageSSL; this.sslSighType = sslSighType; - this.caSSLSignParams = caSSLSignParams; + this.caSignParams = caSignParams; this.selfSignParams = selfSignParams; } @@ -107,7 +107,7 @@ public SSLSighType getSSLSighType() { } public CASignParams getCaSignParam() { - return caSSLSignParams; + return caSignParams; } public SelfSignParams getSelfSignParam() { @@ -130,7 +130,7 @@ public static class NebulaClientOptionsBuilder { private boolean enableMetaSSL = false; private boolean enableStorageSSL = false; private SSLSighType sslSighType = null; - private CASignParams caSSLSignParams = null; + private CASignParams caSignParams = null; private SelfSignParams selfSignParams = null; public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) { @@ -186,7 +186,7 @@ public NebulaClientOptionsBuilder setSSLSignType(SSLSighType sslSighType) { public NebulaClientOptionsBuilder setCaSignParam(String caCrtFilePath, String crtFilePath, String keyFilePath) { - this.caSSLSignParams = new CASignParams(caCrtFilePath, crtFilePath, keyFilePath); + this.caSignParams = new CASignParams(caCrtFilePath, crtFilePath, keyFilePath); return this; } @@ -213,7 +213,7 @@ public NebulaClientOptions build() { } switch (sslSighType) { case CA: - if (caSSLSignParams == null) { + if (caSignParams == null) { throw new IllegalArgumentException("ssl is enabled and sign type is " + "CA, caSignParam must not be null"); } @@ -241,7 +241,7 @@ public NebulaClientOptions build() { enableMetaSSL, enableStorageSSL, sslSighType, - caSSLSignParams, + caSignParams, selfSignParams); } }