diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/CASignParams.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/CASignParams.java new file mode 100644 index 0000000..ba5572e --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/CASignParams.java @@ -0,0 +1,54 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.connection; + +import java.io.Serializable; + +public class CASignParams implements Serializable { + + private String caCrtFilePath; + private String crtFilePath; + private String keyFilePath; + + public CASignParams(String caCrtFilePath, String crtFilePath, String keyFilePath) { + this.caCrtFilePath = caCrtFilePath; + this.crtFilePath = crtFilePath; + this.keyFilePath = keyFilePath; + } + + public String getCaCrtFilePath() { + return caCrtFilePath; + } + + public void setCaCrtFilePath(String caCrtFilePath) { + this.caCrtFilePath = caCrtFilePath; + } + + public String getCrtFilePath() { + return crtFilePath; + } + + public void setCrtFilePath(String crtFilePath) { + this.crtFilePath = crtFilePath; + } + + public String getKeyFilePath() { + return keyFilePath; + } + + public void setKeyFilePath(String keyFilePath) { + this.keyFilePath = keyFilePath; + } + + @Override + public String toString() { + return "CASSLSignParams{" + + "caCrtFilePath='" + caCrtFilePath + '\'' + + ", crtFilePath='" + crtFilePath + '\'' + + ", keyFilePath='" + keyFilePath + '\'' + + '}'; + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java index df3c072..3fa9d3f 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaClientOptions.java @@ -5,9 +5,7 @@ package org.apache.flink.connector.nebula.connection; -import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; -import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -34,18 +32,21 @@ public class NebulaClientOptions implements Serializable { private final boolean enableMetaSSL; + private final boolean enableStorageSSL; + private final SSLSighType sslSighType; - private final CASignedSSLParam caSignParam; + private final CASignParams caSignParams; - private final SelfSignedSSLParam selfSignParam; + private final SelfSignParams selfSignParams; private NebulaClientOptions(String metaAddress, String graphAddress, String username, String password, int timeout, int connectRetry, boolean enableGraphSSL, boolean enableMetaSSL, - SSLSighType sslSighType, CASignedSSLParam caSignParam, - SelfSignedSSLParam selfSignParam) { + boolean enableStorageSSL, + SSLSighType sslSighType, CASignParams caSignParams, + SelfSignParams selfSignParams) { this.metaAddress = metaAddress; this.graphAddress = graphAddress; this.username = username; @@ -54,9 +55,10 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user this.connectRetry = connectRetry; this.enableGraphSSL = enableGraphSSL; this.enableMetaSSL = enableMetaSSL; + this.enableStorageSSL = enableStorageSSL; this.sslSighType = sslSighType; - this.caSignParam = caSignParam; - this.selfSignParam = selfSignParam; + this.caSignParams = caSignParams; + this.selfSignParams = selfSignParams; } public List getMetaAddress() { @@ -96,16 +98,20 @@ public boolean isEnableMetaSSL() { return enableMetaSSL; } + public boolean isEnableStorageSSL() { + return enableStorageSSL; + } + public SSLSighType getSSLSighType() { return sslSighType; } - public CASignedSSLParam getCaSignParam() { - return caSignParam; + public CASignParams getCaSignParam() { + return caSignParams; } - public SelfSignedSSLParam getSelfSignParam() { - return selfSignParam; + public SelfSignParams getSelfSignParam() { + return selfSignParams; } /** @@ -122,9 +128,10 @@ public static class NebulaClientOptionsBuilder { // ssl options private boolean enableGraphSSL = false; private boolean enableMetaSSL = false; + private boolean enableStorageSSL = false; private SSLSighType sslSighType = null; - private CASignedSSLParam caSignParam = null; - private SelfSignedSSLParam selfSignParam = null; + private CASignParams caSignParams = null; + private SelfSignParams selfSignParams = null; public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) { this.metaAddress = metaAddress; @@ -166,6 +173,12 @@ public NebulaClientOptionsBuilder setEnableMetaSSL(boolean enableMetaSSL) { return this; } + public NebulaClientOptionsBuilder setEnableStorageSSL(boolean enableStorageSSL) { + this.enableStorageSSL = enableStorageSSL; + return this; + } + + public NebulaClientOptionsBuilder setSSLSignType(SSLSighType sslSighType) { this.sslSighType = sslSighType; return this; @@ -173,14 +186,13 @@ public NebulaClientOptionsBuilder setSSLSignType(SSLSighType sslSighType) { public NebulaClientOptionsBuilder setCaSignParam(String caCrtFilePath, String crtFilePath, String keyFilePath) { - this.caSignParam = new CASignedSSLParam(caCrtFilePath, crtFilePath, - keyFilePath); + this.caSignParams = new CASignParams(caCrtFilePath, crtFilePath, keyFilePath); return this; } public NebulaClientOptionsBuilder setSelfSignParam(String crtFilePath, String keyFilePath, String password) { - this.selfSignParam = new SelfSignedSSLParam(crtFilePath, keyFilePath, password); + this.selfSignParams = new SelfSignParams(crtFilePath, keyFilePath, password); return this; } @@ -188,26 +200,27 @@ public NebulaClientOptions build() { if (metaAddress == null || metaAddress.trim().isEmpty()) { throw new IllegalArgumentException("meta address can not be empty."); } - if (enableMetaSSL || enableGraphSSL) { - // if meta is set to open ssl, then graph must be set to open ssl - if (enableMetaSSL && !enableGraphSSL) { + if (enableMetaSSL || enableGraphSSL || enableStorageSSL) { + // if storage is set to open ssl, then meta must be set to open ssl + if (enableStorageSSL && !enableMetaSSL) { throw new IllegalArgumentException( - "meta ssl is enable, graph ssl must be enable"); + "storage ssl is enabled, meta ssl must be enabled."); } + if (sslSighType == null) { - throw new IllegalArgumentException("ssl is enable, ssl sign type must not be " + throw new IllegalArgumentException("ssl is enabled, ssl sign type must not be " + "null"); } switch (sslSighType) { case CA: - if (caSignParam == null) { - throw new IllegalArgumentException("ssl is enable and sign type is " + if (caSignParams == null) { + throw new IllegalArgumentException("ssl is enabled and sign type is " + "CA, caSignParam must not be null"); } break; case SELF: - if (selfSignParam == null) { - throw new IllegalArgumentException("ssl is enable and sign type is " + if (selfSignParams == null) { + throw new IllegalArgumentException("ssl is enabled and sign type is " + "CA, selfSignParam must not be null"); } break; @@ -226,9 +239,10 @@ public NebulaClientOptions build() { connectRetry, enableGraphSSL, enableMetaSSL, + enableStorageSSL, sslSighType, - caSignParam, - selfSignParam); + caSignParams, + selfSignParams); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java index 830d24c..af9b99c 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaGraphConnectionProvider.java @@ -7,13 +7,11 @@ import com.vesoft.nebula.client.graph.NebulaPoolConfig; +import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; -import com.vesoft.nebula.client.graph.exception.AuthFailedException; -import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException; -import com.vesoft.nebula.client.graph.exception.IOErrorException; -import com.vesoft.nebula.client.graph.exception.NotValidConnectionException; +import com.vesoft.nebula.client.graph.data.SSLParam; +import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam; import com.vesoft.nebula.client.graph.net.NebulaPool; -import com.vesoft.nebula.client.graph.net.Session; import java.io.Serializable; import java.net.UnknownHostException; import java.util.ArrayList; @@ -51,12 +49,20 @@ public NebulaPool getNebulaPool() throws UnknownHostException { if (nebulaClientOptions.isEnableGraphSSL()) { poolConfig.setEnableSsl(true); switch (nebulaClientOptions.getSSLSighType()) { - case CA: - poolConfig.setSslParam(nebulaClientOptions.getCaSignParam()); + case CA: { + CASignParams caSignParams = nebulaClientOptions.getCaSignParam(); + SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(), + caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath()); + poolConfig.setSslParam(sslParam); break; - case SELF: - poolConfig.setSslParam(nebulaClientOptions.getSelfSignParam()); + } + case SELF: { + SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam(); + SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(), + selfSignParams.getKeyFilePath(), selfSignParams.getPassword()); + poolConfig.setSslParam(sslParam); break; + } default: throw new IllegalArgumentException("ssl sign type is not supported."); } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java index 1fb7499..09e3c38 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaMetaConnectionProvider.java @@ -6,7 +6,10 @@ package org.apache.flink.connector.nebula.connection; import com.facebook.thrift.TException; +import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.SSLParam; +import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam; import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException; import com.vesoft.nebula.client.meta.MetaClient; import com.vesoft.nebula.client.meta.exception.ExecuteFailedException; @@ -15,11 +18,9 @@ import com.vesoft.nebula.meta.Schema; import com.vesoft.nebula.meta.SpaceItem; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.flink.connector.nebula.utils.NebulaConstant; import org.apache.flink.connector.nebula.utils.VidTypeEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +37,32 @@ public NebulaMetaConnectionProvider(NebulaClientOptions nebulaClientOptions) { public MetaClient getMetaClient() throws TException, ClientServerIncompatibleException { List addresses = nebulaClientOptions.getMetaAddress(); - MetaClient metaClient = new MetaClient(addresses); + int timeout = nebulaClientOptions.getTimeout(); + int retry = nebulaClientOptions.getConnectRetry(); + MetaClient metaClient; + if (nebulaClientOptions.isEnableMetaSSL()) { + switch (nebulaClientOptions.getSSLSighType()) { + case CA: { + CASignParams caSignParams = nebulaClientOptions.getCaSignParam(); + SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(), + caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath()); + metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam); + break; + } + case SELF: { + SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam(); + SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(), + selfSignParams.getKeyFilePath(), selfSignParams.getPassword()); + metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam); + break; + } + default: + throw new IllegalArgumentException("ssl sign type is not supported."); + } + } else { + metaClient = new MetaClient(addresses, timeout, retry, retry); + } + metaClient.connect(); return metaClient; } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java index 9c7f908..ed12269 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/NebulaStorageConnectionProvider.java @@ -6,7 +6,10 @@ package org.apache.flink.connector.nebula.connection; +import com.vesoft.nebula.client.graph.data.CASignedSSLParam; import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.SSLParam; +import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam; import com.vesoft.nebula.client.storage.StorageClient; import java.io.Serializable; import java.util.List; @@ -26,7 +29,34 @@ public NebulaStorageConnectionProvider() { public StorageClient getStorageClient() throws Exception { List addresses = nebulaClientOptions.getMetaAddress(); - StorageClient storageClient = new StorageClient(addresses); + int timeout = nebulaClientOptions.getTimeout(); + int retry = nebulaClientOptions.getConnectRetry(); + StorageClient storageClient; + if (nebulaClientOptions.isEnableStorageSSL()) { + switch (nebulaClientOptions.getSSLSighType()) { + case CA: { + CASignParams caSignParams = nebulaClientOptions.getCaSignParam(); + SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(), + caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath()); + storageClient = new StorageClient(addresses, timeout, retry, retry, true, + sslParam); + break; + } + case SELF: { + SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam(); + SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(), + selfSignParams.getKeyFilePath(), selfSignParams.getPassword()); + storageClient = new StorageClient(addresses, timeout, retry, retry, true, + sslParam); + break; + } + default: + throw new IllegalArgumentException("ssl sign type is not supported."); + } + } else { + storageClient = new StorageClient(addresses, timeout); + } + if (!storageClient.connect()) { throw new Exception("failed to connect storaged."); } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/connection/SelfSignParams.java b/connector/src/main/java/org.apache.flink/connector/nebula/connection/SelfSignParams.java new file mode 100644 index 0000000..5e42ed5 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/connection/SelfSignParams.java @@ -0,0 +1,54 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.connection; + +import java.io.Serializable; + +public class SelfSignParams implements Serializable { + + private String crtFilePath; + private String keyFilePath; + private String password; + + public SelfSignParams(String crtFilePath, String keyFilePath, String password) { + this.crtFilePath = crtFilePath; + this.keyFilePath = keyFilePath; + this.password = password; + } + + public String getCrtFilePath() { + return crtFilePath; + } + + public void setCrtFilePath(String crtFilePath) { + this.crtFilePath = crtFilePath; + } + + public String getKeyFilePath() { + return keyFilePath; + } + + public void setKeyFilePath(String keyFilePath) { + this.keyFilePath = keyFilePath; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + @Override + public String toString() { + return "SelfSSLSignParams{" + + "crtFilePath='" + crtFilePath + '\'' + + ", keyFilePath='" + keyFilePath + '\'' + + ", password='" + password + '\'' + + '}'; + } +} diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaClientOptionsTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaClientOptionsTest.java index 8275dca..7780659 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaClientOptionsTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/connection/NebulaClientOptionsTest.java @@ -51,8 +51,8 @@ public void testIsEnableGraphSsl() { .NebulaClientOptionsBuilder() .setGraphAddress(null) .setMetaAddress("127.0.0.1:9559") - .setEnableGraphSSL(false) - .setEnableMetaSSL(true) + .setEnableMetaSSL(false) + .setEnableStorageSSL(true) .setSSLSignType(SSLSighType.CA) .setCaSignParam("caCrtFile", "crtFile", "keyFile") .build(); diff --git a/example/src/main/java/org/apache/flink/FlinkConnectorExample.java b/example/src/main/java/org/apache/flink/FlinkConnectorExample.java index 37d4463..418a62b 100644 --- a/example/src/main/java/org/apache/flink/FlinkConnectorExample.java +++ b/example/src/main/java/org/apache/flink/FlinkConnectorExample.java @@ -45,6 +45,7 @@ public class FlinkConnectorExample { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(5); DataStream> playerSource = constructVertexSourceData(env); sinkVertexData(env, playerSource); updateVertexData(env, playerSource); diff --git a/example/src/main/java/org/apache/flink/FlinkConnectorSourceExample.java b/example/src/main/java/org/apache/flink/FlinkConnectorSourceExample.java index 9c1ffb5..f0040b4 100644 --- a/example/src/main/java/org/apache/flink/FlinkConnectorSourceExample.java +++ b/example/src/main/java/org/apache/flink/FlinkConnectorSourceExample.java @@ -19,6 +19,7 @@ import org.apache.flink.connector.nebula.statement.EdgeExecutionOptions; import org.apache.flink.connector.nebula.statement.ExecutionOptions; import org.apache.flink.connector.nebula.statement.VertexExecutionOptions; +import org.apache.flink.connector.nebula.utils.SSLSighType; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; @@ -47,6 +48,8 @@ public class FlinkConnectorSourceExample { private static final Logger LOG = LoggerFactory.getLogger(FlinkConnectorSourceExample.class); private static NebulaStorageConnectionProvider storageConnectionProvider; + private static NebulaStorageConnectionProvider storageConnectionProviderCaSSL; + private static NebulaStorageConnectionProvider storageConnectionProviderSelfSSL; private static ExecutionOptions vertexExecutionOptions; private static ExecutionOptions edgeExecutionOptions; @@ -74,6 +77,30 @@ public static void initConfig() { storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions); + NebulaClientOptions nebulaClientOptionsWithCaSSL = + new NebulaClientOptions.NebulaClientOptionsBuilder() + .setEnableMetaSSL(true) + .setEnableStorageSSL(true) + .setSSLSignType(SSLSighType.CA) + .setCaSignParam("example/src/main/resources/ssl/casigned.pem", + "example/src/main/resources/ssl/casigned.crt", + "example/src/main/resources/ssl/casigned.key") + .build(); + storageConnectionProviderCaSSL = + new NebulaStorageConnectionProvider(nebulaClientOptionsWithCaSSL); + + NebulaClientOptions nebulaClientOptionsWithSelfSSL = + new NebulaClientOptions.NebulaClientOptionsBuilder() + .setEnableMetaSSL(true) + .setEnableStorageSSL(true) + .setSSLSignType(SSLSighType.SELF) + .setCaSignParam("example/src/main/resources/ssl/selfsigned.pem", + "example/src/main/resources/ssl/selfsigned.key", + "vesoft") + .build(); + storageConnectionProviderSelfSSL = + new NebulaStorageConnectionProvider(nebulaClientOptionsWithSelfSSL); + // read no property vertexExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder() .setGraphSpace("flinkSource")