Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flink sql add create space #66

Merged
merged 6 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.apache.flink.connector.nebula.catalog;

import static org.apache.flink.util.Preconditions.checkArgument;

import com.facebook.thrift.TException;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
Expand All @@ -29,9 +31,11 @@
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* AbstractNebulaCatalog is used to get nebula schema
* flink-nebula catalog doesn't support write operators, such as create/alter
Expand All @@ -43,12 +47,21 @@ public abstract class AbstractNebulaCatalog extends AbstractCatalog {
protected final String username;
protected final String password;
protected final String address;

private static final String DEFAULT_DATABASE = "default";

public AbstractNebulaCatalog(String catalogName, String defaultDatabase, String username,
String pwd, String address) {
super(catalogName, defaultDatabase);
// TODO check arguments
super(catalogName, defaultDatabase == null ? DEFAULT_DATABASE : defaultDatabase);
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(username),
"username cannot be null or empty.");
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(pwd),
"password cannot be null or empty.");
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(address),
"address cannot be null or empty."
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the todo work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my pleasure.

this.username = username;
this.password = pwd;
this.address = address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,19 @@
import static org.apache.flink.connector.nebula.table.NebulaDynamicTableFactory.PASSWORD;
import static org.apache.flink.connector.nebula.table.NebulaDynamicTableFactory.USERNAME;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;

import com.facebook.thrift.TException;
import com.vesoft.nebula.PropertyType;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.data.ResultSet;
import com.vesoft.nebula.client.graph.exception.AuthFailedException;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
import com.vesoft.nebula.client.graph.exception.IOErrorException;
import com.vesoft.nebula.client.graph.exception.NotValidConnectionException;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import com.vesoft.nebula.client.graph.net.Session;
import com.vesoft.nebula.client.meta.MetaClient;
import com.vesoft.nebula.client.meta.exception.ExecuteFailedException;
import com.vesoft.nebula.meta.ColumnDef;
Expand All @@ -29,9 +37,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider;
import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider;
import org.apache.flink.connector.nebula.utils.DataTypeEnum;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.connector.nebula.utils.NebulaSpace;
import org.apache.flink.connector.nebula.utils.NebulaSpaces;
import org.apache.flink.connector.nebula.utils.NebulaUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
Expand All @@ -47,17 +62,69 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class NebulaCatalog extends AbstractNebulaCatalog {

private static final Logger LOG = LoggerFactory.getLogger(NebulaCatalog.class);
private final List<HostAddress> hostAndPorts;
private final MetaClient metaClient;

public NebulaCatalog(String catalogName, String defaultDatabase, String username, String pwd,
String address) throws UnknownHostException {
super(catalogName, defaultDatabase, username, pwd, address);
this.hostAndPorts = NebulaUtils.getHostAndPorts(address);
this.metaClient = new MetaClient(hostAndPorts);
private final NebulaClientOptions nebulaClientOptions;
private MetaClient metaClient;
private NebulaPool nebulaPool;
private Session session;

public NebulaCatalog(
String catalogName,
@Nullable String defaultDatabase,
String username,
String password,
String metaAddress,
String graphAddress) {
super(catalogName, defaultDatabase, username, password, metaAddress);
nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress(graphAddress)
.setMetaAddress(metaAddress)
.setUsername(username)
.setPassword(password)
.build();
}

@Override
public void open() throws CatalogException {
super.open();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider =
new NebulaMetaConnectionProvider(nebulaClientOptions);
try {
this.metaClient = metaConnectionProvider.getMetaClient();
} catch (UnknownHostException | ClientServerIncompatibleException e) {
LOG.error("nebula get meta client error, ", e);
throw new CatalogException("nebula get meta client error.", e);
}

try {
nebulaPool = graphConnectionProvider.getNebulaPool();
session = nebulaPool.getSession(graphConnectionProvider.getUserName(),
graphConnectionProvider.getPassword(), true);
} catch (NotValidConnectionException | IOErrorException | AuthFailedException
| ClientServerIncompatibleException | UnknownHostException e) {
LOG.error("failed to get graph session, ", e);
throw new CatalogException("get graph session error, ", e);
}
}

@Override
public void close() throws CatalogException {
super.close();
if (session != null) {
session.release();
}
if (nebulaPool != null) {
nebulaPool.close();
}
if (metaClient != null) {
metaClient.close();
}
}

@Override
Expand Down Expand Up @@ -93,11 +160,65 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE
}
}

/**
* @param dataBaseName same as graph space name in nebula graph
* @param catalogDatabase catalog implementation
* @param ignoreIfExists true if contains [if not exists] clause else false
*/
@Override
public void createDatabase(String dataBaseName,
CatalogDatabase catalogDatabase,
boolean ignoreIfExists)
throws CatalogException {
checkArgument(
!isNullOrWhitespaceOnly(dataBaseName), "space name cannot be null or empty.");
checkNotNull(catalogDatabase, "space cannot be null.");

if (ignoreIfExists && listDatabases().contains(dataBaseName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the view of product consistency, if ignoreIfExists is true, we should also give the positive feedback.

For Nebula, if a space exists, we'll get SUCCEED when create a repeated space, even though they have different properties, such as vid_type.
image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, there are indeed logical inconsistencies, i'll fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the doc
image

Now, if the space has already existed and create the space with IF NOT EXISTS clause it will do nothing. just as
ded8300.

Test Result:

 INFO [main] - Repeat to create space, db1 already exists, no effect.

Do you think it's OK to revise this way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine, thanks for this kindly log info.

LOG.error("failed to create graph space {}, already exists", dataBaseName);
throw new CatalogException("nebula create graph space failed.");
}
Map<String, String> properties = catalogDatabase.getProperties();
Map<String, String> newProperties = properties.entrySet().stream().collect(
Collectors.toMap(
entry -> entry.getKey().toLowerCase(),
entry -> entry.getValue().toUpperCase()
)
);
if (!newProperties.containsKey(NebulaConstant.CREATE_VID_TYPE)) {
LOG.error("failed to create graph space {}, missing VID type param", properties);
throw new CatalogException("nebula create graph space failed, missing VID type.");
}
String vidType = newProperties.get(NebulaConstant.CREATE_VID_TYPE);
if (!NebulaUtils.checkValidVidType(vidType)) {
LOG.error("VID type not satisfy {}", vidType);
throw new CatalogException("nebula graph dont support VID type.");
}
NebulaSpace space = new NebulaSpace(
dataBaseName,catalogDatabase.getComment(), newProperties);
NebulaSpaces nebulaSpaces = new NebulaSpaces(space);
String statement = nebulaSpaces.getCreateStatement();
ResultSet execResult = null;
try {
execResult = session.execute(statement);
} catch (IOErrorException e) {
LOG.error("nebula create graph space execute failed.", e);
throw new CatalogException("nebula create graph space execute failed.");
}

if (execResult.isSucceeded()) {
LOG.debug("create space success.");
} else {
LOG.error("create space failed: {}", execResult.getErrorMessage());
throw new CatalogException("create space failed, " + execResult.getErrorMessage());
}
}

/**
* objectName in tablePath mush start with VERTEX. or EDGE.
*
* @param tablePath A graphSpace name and label name.
* @return
* @return Table exists or not
*/
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
Expand All @@ -115,7 +236,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
* show all tags and edges
*
* @param graphSpace nebula graph space
* @return List of Tag and Edge, tag starts with VERTEX. and edge starts with EDGE. .
* @return List of Tag and Edge, tag starts with VERTEX. and edge starts with EDGE.
*/
@Override
public List<String> listTables(String graphSpace) throws DatabaseNotExistException,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.apache.flink.connector.nebula.utils;

import java.net.UnknownHostException;
import org.apache.flink.connector.nebula.catalog.NebulaCatalog;

/**
Expand All @@ -16,9 +15,14 @@ public class NebulaCatalogUtils {
/**
* Create catalog instance from given information
*/
public static NebulaCatalog createNebulaCatalog(String catalogName, String defaultSpace,
String address, String username,
String password) throws UnknownHostException {
return new NebulaCatalog(catalogName, defaultSpace, username, password, address);
public static NebulaCatalog createNebulaCatalog(
String catalogName,
String defaultSpace,
String username,
String password,
String metaAddress,
String graphAddress) {
return new NebulaCatalog(catalogName, defaultSpace, username, password,
metaAddress, graphAddress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ public class NebulaConstant {
public static String DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s";
public static String EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d";

// template for create space statement
public static String CREATE_SPACE_TEMPLATE = "CREATE SPACE `%s` (%s)";
public static String CREATE_SPACE_COMMENT = " COMMENT = '%s'";

// Delimiter
public static String COMMA = ",";
public static String SUB_LINE = "_";
Expand All @@ -44,4 +48,12 @@ public class NebulaConstant {
public static final int DEFAULT_CONNECT_RETRY = 3;
public static final int DEFAULT_EXECUTION_RETRY = 3;

// params for create space
public static final String CREATE_VID_TYPE = "vid_type";
public static final String CREATE_PARTITION_NUM = "partition_num";
public static final String CREATE_REPLICA_FACTOR = "replica_factor";

// default params for create space
public static final int DEFAULT_PARTITION_NUM = 100;
public static final int DEFAULT_REPLICA_FACTOR = 1;
}
Loading