Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support ssl for flink connector #36

Merged
merged 8 commits into from
Nov 18, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package org.apache.flink.connector.nebula.connection;

import java.io.Serializable;

public class CASignParams implements Serializable {

private String caCrtFilePath;
private String crtFilePath;
private String keyFilePath;

public CASignParams(String caCrtFilePath, String crtFilePath, String keyFilePath) {
this.caCrtFilePath = caCrtFilePath;
this.crtFilePath = crtFilePath;
this.keyFilePath = keyFilePath;
}

public String getCaCrtFilePath() {
return caCrtFilePath;
}

public void setCaCrtFilePath(String caCrtFilePath) {
this.caCrtFilePath = caCrtFilePath;
}

public String getCrtFilePath() {
return crtFilePath;
}

public void setCrtFilePath(String crtFilePath) {
this.crtFilePath = crtFilePath;
}

public String getKeyFilePath() {
return keyFilePath;
}

public void setKeyFilePath(String keyFilePath) {
this.keyFilePath = keyFilePath;
}

@Override
public String toString() {
return "CASSLSignParams{"
+ "caCrtFilePath='" + caCrtFilePath + '\''
+ ", crtFilePath='" + crtFilePath + '\''
+ ", keyFilePath='" + keyFilePath + '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

package org.apache.flink.connector.nebula.connection;

import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -34,18 +32,21 @@ public class NebulaClientOptions implements Serializable {

private final boolean enableMetaSSL;

private final boolean enableStorageSSL;

private final SSLSighType sslSighType;

private final CASignedSSLParam caSignParam;
private final CASignParams caSSLSignParams;

private final SelfSignedSSLParam selfSignParam;
private final SelfSignParams selfSignParams;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one with SSL and another with not ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix the caSSLSignParams to caSignParams.



private NebulaClientOptions(String metaAddress, String graphAddress, String username,
String password, int timeout, int connectRetry,
boolean enableGraphSSL, boolean enableMetaSSL,
SSLSighType sslSighType, CASignedSSLParam caSignParam,
SelfSignedSSLParam selfSignParam) {
boolean enableStorageSSL,
SSLSighType sslSighType, CASignParams caSSLSignParams,
SelfSignParams selfSignParams) {
this.metaAddress = metaAddress;
this.graphAddress = graphAddress;
this.username = username;
Expand All @@ -54,9 +55,10 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user
this.connectRetry = connectRetry;
this.enableGraphSSL = enableGraphSSL;
this.enableMetaSSL = enableMetaSSL;
this.enableStorageSSL = enableStorageSSL;
this.sslSighType = sslSighType;
this.caSignParam = caSignParam;
this.selfSignParam = selfSignParam;
this.caSSLSignParams = caSSLSignParams;
this.selfSignParams = selfSignParams;
}

public List<HostAddress> getMetaAddress() {
Expand Down Expand Up @@ -96,16 +98,20 @@ public boolean isEnableMetaSSL() {
return enableMetaSSL;
}

public boolean isEnableStorageSSL() {
return enableStorageSSL;
}

public SSLSighType getSSLSighType() {
return sslSighType;
}

public CASignedSSLParam getCaSignParam() {
return caSignParam;
public CASignParams getCaSignParam() {
return caSSLSignParams;
}

public SelfSignedSSLParam getSelfSignParam() {
return selfSignParam;
public SelfSignParams getSelfSignParam() {
return selfSignParams;
}

/**
Expand All @@ -122,9 +128,10 @@ public static class NebulaClientOptionsBuilder {
// ssl options
private boolean enableGraphSSL = false;
private boolean enableMetaSSL = false;
private boolean enableStorageSSL = false;
private SSLSighType sslSighType = null;
private CASignedSSLParam caSignParam = null;
private SelfSignedSSLParam selfSignParam = null;
private CASignParams caSSLSignParams = null;
private SelfSignParams selfSignParams = null;

public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) {
this.metaAddress = metaAddress;
Expand Down Expand Up @@ -166,48 +173,54 @@ public NebulaClientOptionsBuilder setEnableMetaSSL(boolean enableMetaSSL) {
return this;
}

public NebulaClientOptionsBuilder setEnableStorageSSL(boolean enableStorageSSL) {
this.enableStorageSSL = enableStorageSSL;
return this;
}


public NebulaClientOptionsBuilder setSSLSignType(SSLSighType sslSighType) {
this.sslSighType = sslSighType;
return this;
}

public NebulaClientOptionsBuilder setCaSignParam(String caCrtFilePath, String crtFilePath,
String keyFilePath) {
this.caSignParam = new CASignedSSLParam(caCrtFilePath, crtFilePath,
keyFilePath);
this.caSSLSignParams = new CASignParams(caCrtFilePath, crtFilePath, keyFilePath);
return this;
}

public NebulaClientOptionsBuilder setSelfSignParam(String crtFilePath, String keyFilePath,
String password) {
this.selfSignParam = new SelfSignedSSLParam(crtFilePath, keyFilePath, password);
this.selfSignParams = new SelfSignParams(crtFilePath, keyFilePath, password);
return this;
}

public NebulaClientOptions build() {
if (metaAddress == null || metaAddress.trim().isEmpty()) {
throw new IllegalArgumentException("meta address can not be empty.");
}
if (enableMetaSSL || enableGraphSSL) {
// if meta is set to open ssl, then graph must be set to open ssl
if (enableMetaSSL && !enableGraphSSL) {
if (enableMetaSSL || enableGraphSSL || enableStorageSSL) {
// if storage is set to open ssl, then meta must be set to open ssl
if (enableStorageSSL && !enableMetaSSL) {
throw new IllegalArgumentException(
"meta ssl is enable, graph ssl must be enable");
"storage ssl is enabled, meta ssl must be enabled.");
}

if (sslSighType == null) {
throw new IllegalArgumentException("ssl is enable, ssl sign type must not be "
throw new IllegalArgumentException("ssl is enabled, ssl sign type must not be "
+ "null");
}
switch (sslSighType) {
case CA:
if (caSignParam == null) {
throw new IllegalArgumentException("ssl is enable and sign type is "
if (caSSLSignParams == null) {
throw new IllegalArgumentException("ssl is enabled and sign type is "
+ "CA, caSignParam must not be null");
}
break;
case SELF:
if (selfSignParam == null) {
throw new IllegalArgumentException("ssl is enable and sign type is "
if (selfSignParams == null) {
throw new IllegalArgumentException("ssl is enabled and sign type is "
+ "CA, selfSignParam must not be null");
}
break;
Expand All @@ -226,9 +239,10 @@ public NebulaClientOptions build() {
connectRetry,
enableGraphSSL,
enableMetaSSL,
enableStorageSSL,
sslSighType,
caSignParam,
selfSignParam);
caSSLSignParams,
selfSignParams);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@


import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.data.SSLParam;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import java.io.Serializable;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,12 +49,20 @@ public NebulaPool getNebulaPool() throws UnknownHostException {
if (nebulaClientOptions.isEnableGraphSSL()) {
poolConfig.setEnableSsl(true);
switch (nebulaClientOptions.getSSLSighType()) {
case CA:
poolConfig.setSslParam(nebulaClientOptions.getCaSignParam());
case CA: {
CASignParams caSignParams = nebulaClientOptions.getCaSignParam();
SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(),
caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath());
poolConfig.setSslParam(sslParam);
break;
case SELF:
poolConfig.setSslParam(nebulaClientOptions.getSelfSignParam());
}
case SELF: {
SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam();
SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(),
selfSignParams.getKeyFilePath(), selfSignParams.getPassword());
poolConfig.setSslParam(sslParam);
break;
}
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package org.apache.flink.connector.nebula.connection;

import com.facebook.thrift.TException;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.SSLParam;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.meta.MetaClient;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
Expand All @@ -15,11 +18,9 @@
import com.vesoft.nebula.meta.Schema;
import com.vesoft.nebula.meta.SpaceItem;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.VidTypeEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,7 +37,32 @@ public NebulaMetaConnectionProvider(NebulaClientOptions nebulaClientOptions) {

public MetaClient getMetaClient() throws TException, ClientServerIncompatibleException {
List<HostAddress> addresses = nebulaClientOptions.getMetaAddress();
MetaClient metaClient = new MetaClient(addresses);
int timeout = nebulaClientOptions.getTimeout();
int retry = nebulaClientOptions.getConnectRetry();
MetaClient metaClient;
if (nebulaClientOptions.isEnableMetaSSL()) {
switch (nebulaClientOptions.getSSLSighType()) {
case CA: {
CASignParams caSignParams = nebulaClientOptions.getCaSignParam();
SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(),
caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath());
metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam);
break;
}
case SELF: {
SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam();
SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(),
selfSignParams.getKeyFilePath(), selfSignParams.getPassword());
metaClient = new MetaClient(addresses, timeout, retry, retry, true, sslParam);
break;
}
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
} else {
metaClient = new MetaClient(addresses, timeout, retry, retry);
}

metaClient.connect();
return metaClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
package org.apache.flink.connector.nebula.connection;


import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.SSLParam;
import com.vesoft.nebula.client.graph.data.SelfSignedSSLParam;
import com.vesoft.nebula.client.storage.StorageClient;
import java.io.Serializable;
import java.util.List;
Expand All @@ -26,7 +29,34 @@ public NebulaStorageConnectionProvider() {

public StorageClient getStorageClient() throws Exception {
List<HostAddress> addresses = nebulaClientOptions.getMetaAddress();
StorageClient storageClient = new StorageClient(addresses);
int timeout = nebulaClientOptions.getTimeout();
int retry = nebulaClientOptions.getConnectRetry();
StorageClient storageClient;
if (nebulaClientOptions.isEnableStorageSSL()) {
switch (nebulaClientOptions.getSSLSighType()) {
case CA: {
CASignParams caSignParams = nebulaClientOptions.getCaSignParam();
SSLParam sslParam = new CASignedSSLParam(caSignParams.getCaCrtFilePath(),
caSignParams.getCrtFilePath(), caSignParams.getKeyFilePath());
storageClient = new StorageClient(addresses, timeout, retry, retry, true,
sslParam);
break;
}
case SELF: {
SelfSignParams selfSignParams = nebulaClientOptions.getSelfSignParam();
SSLParam sslParam = new SelfSignedSSLParam(selfSignParams.getCrtFilePath(),
selfSignParams.getKeyFilePath(), selfSignParams.getPassword());
storageClient = new StorageClient(addresses, timeout, retry, retry, true,
sslParam);
break;
}
default:
throw new IllegalArgumentException("ssl sign type is not supported.");
}
} else {
storageClient = new StorageClient(addresses, timeout);
}

if (!storageClient.connect()) {
throw new Exception("failed to connect storaged.");
}
Expand Down
Loading