diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/AbstractNebulaCatalog.java b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/AbstractNebulaCatalog.java index 021e8c8..2b7755e 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/AbstractNebulaCatalog.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/AbstractNebulaCatalog.java @@ -5,6 +5,8 @@ package org.apache.flink.connector.nebula.catalog; +import static org.apache.flink.util.Preconditions.checkArgument; + import com.facebook.thrift.TException; import com.vesoft.nebula.client.graph.data.HostAddress; import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException; @@ -29,9 +31,11 @@ import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; +import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + /** * AbstractNebulaCatalog is used to get nebula schema * flink-nebula catalog doesn't support write operators, such as create/alter @@ -43,14 +47,23 @@ public abstract class AbstractNebulaCatalog extends AbstractCatalog { protected final String username; protected final String password; protected final String address; - + private static final String DEFAULT_DATABASE = "default"; public AbstractNebulaCatalog(String catalogName, String defaultDatabase, String username, - String pwd, String address) { - super(catalogName, defaultDatabase); - // TODO check arguments + String password, String address) { + super(catalogName, defaultDatabase == null ? DEFAULT_DATABASE : defaultDatabase); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(username), + "username cannot be null or empty."); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(password), + "password cannot be null or empty."); + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(address), + "address cannot be null or empty." + ); this.username = username; - this.password = pwd; + this.password = password; this.address = address; } @@ -100,59 +113,64 @@ public Optional getFactory() { * operators for nebula graph */ @Override - public boolean databaseExists(String graphName) throws CatalogException { - return listDatabases().contains(graphName); + public boolean databaseExists(String dataBaseName) throws CatalogException { + return listDatabases().contains(dataBaseName); } @Override - public void createDatabase(String s, CatalogDatabase catalogDatabase, boolean b) - throws CatalogException { + public void createDatabase(String dataBaseName, + CatalogDatabase catalogDatabase, + boolean ignoreIfExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void dropDatabase(String name, boolean ignoreIfNotExists) throws CatalogException { - throw new UnsupportedOperationException(); + public void dropDatabase(String dataBaseName, boolean ignoreIfNotExists) + throws CatalogException { + dropDatabase(dataBaseName, ignoreIfNotExists, false); } @Override - public void dropDatabase(String s, boolean b, boolean b1) throws CatalogException { + public void dropDatabase(String dataBaseName, boolean ignoreIfNotExists, boolean cascade) + throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void alterDatabase(String s, CatalogDatabase catalogDatabase, boolean b) - throws CatalogException { + public void alterDatabase(String dataBaseName, CatalogDatabase newDatabase, + boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } /** - * operator for nebula graph tag and edge, parameter should be graphName.tag or graphName.edge + * operator for nebula graph tag and edge, parameter should be databaseName.tag or + * databaseName.edge */ @Override - public List listViews(String graphName) throws CatalogException { + public List listViews(String databaseName) throws CatalogException { return Collections.emptyList(); } - @Override - public void dropTable(ObjectPath objectPath, boolean b) throws CatalogException { + public void dropTable(ObjectPath tablePath,boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void renameTable(ObjectPath objectPath, String s, boolean b) throws CatalogException { + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b) + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b) + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } @@ -161,56 +179,61 @@ public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, * operator for partition */ @Override - public List listPartitions(ObjectPath objectPath) + public List listPartitions(ObjectPath tablePath) throws CatalogException { return Collections.emptyList(); } @Override - public List listPartitions(ObjectPath objectPath, - CatalogPartitionSpec catalogPartitionSpec) + public List listPartitions(ObjectPath tablePath, + CatalogPartitionSpec partitionSpec) throws CatalogException { return Collections.emptyList(); } @Override - public List listPartitionsByFilter(ObjectPath objectPath, - List list) + public List listPartitionsByFilter(ObjectPath tablePath, + List filters) throws CatalogException { return Collections.emptyList(); } @Override - public CatalogPartition getPartition(ObjectPath objectPath, - CatalogPartitionSpec catalogPartitionSpec) + public CatalogPartition getPartition(ObjectPath tablePath, + CatalogPartitionSpec partitionSpec) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public boolean partitionExists(ObjectPath objectPath, - CatalogPartitionSpec catalogPartitionSpec) + public boolean partitionExists(ObjectPath tablePath, + CatalogPartitionSpec partitionSpec) throws CatalogException { return false; } @Override - public void createPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, - CatalogPartition catalogPartition, boolean b) - throws CatalogException { + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, - boolean b) throws CatalogException { + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec, - CatalogPartition catalogPartition, boolean b) - throws CatalogException { + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } @@ -218,95 +241,101 @@ public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPa * operator for function */ @Override - public List listFunctions(String s) throws CatalogException { + public List listFunctions(String dbName) throws CatalogException { return Collections.emptyList(); } @Override - public CatalogFunction getFunction(ObjectPath objectPath) throws FunctionNotExistException, + public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException { - throw new FunctionNotExistException(getName(), objectPath); + throw new FunctionNotExistException(getName(), functionPath); } @Override - public boolean functionExists(ObjectPath objectPath) throws CatalogException { + public boolean functionExists(ObjectPath functionPath) throws CatalogException { return false; } @Override - public void createFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean b) + public void createFunction( + ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void alterFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean b) + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void dropFunction(ObjectPath objectPath, boolean b) throws CatalogException { - + public void dropFunction( + ObjectPath functionPath, boolean ignoreIfNotExists) throws CatalogException { + throw new UnsupportedOperationException(); } /** * operator for stat */ @Override - public CatalogTableStatistics getTableStatistics(ObjectPath objectPath) - throws CatalogException { + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException { return CatalogTableStatistics.UNKNOWN; } @Override - public CatalogColumnStatistics getTableColumnStatistics(ObjectPath objectPath) + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws CatalogException { return CatalogColumnStatistics.UNKNOWN; } @Override - public CatalogTableStatistics getPartitionStatistics(ObjectPath objectPath, - CatalogPartitionSpec catalogPartitionSpec) + public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, + CatalogPartitionSpec partitionSpec) throws CatalogException { return CatalogTableStatistics.UNKNOWN; } @Override public CatalogColumnStatistics getPartitionColumnStatistics( - ObjectPath objectPath, - CatalogPartitionSpec catalogPartitionSpec) + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec) throws CatalogException { return CatalogColumnStatistics.UNKNOWN; } @Override - public void alterTableStatistics(ObjectPath objectPath, - CatalogTableStatistics catalogTableStatistics, boolean b) - throws CatalogException { + public void alterTableStatistics( + ObjectPath tablePath, + CatalogTableStatistics tableStatistics, + boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void alterTableColumnStatistics(ObjectPath objectPath, - CatalogColumnStatistics catalogColumnStatistics, - boolean b) throws CatalogException { + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void alterPartitionStatistics(ObjectPath objectPath, - CatalogPartitionSpec catalogPartitionSpec, - CatalogTableStatistics catalogTableStatistics, - boolean b) throws CatalogException { + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } @Override - public void alterPartitionColumnStatistics(ObjectPath objectPath, - CatalogPartitionSpec catalogPartitionSpec, - CatalogColumnStatistics catalogColumnStatistics, - boolean b) throws CatalogException { + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) throws CatalogException { throw new UnsupportedOperationException(); } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/NebulaCatalog.java b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/NebulaCatalog.java index 2924389..e3b008d 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/NebulaCatalog.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/NebulaCatalog.java @@ -12,11 +12,19 @@ import static org.apache.flink.connector.nebula.table.NebulaDynamicTableFactory.PASSWORD; import static org.apache.flink.connector.nebula.table.NebulaDynamicTableFactory.USERNAME; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; import com.facebook.thrift.TException; import com.vesoft.nebula.PropertyType; -import com.vesoft.nebula.client.graph.data.HostAddress; +import com.vesoft.nebula.client.graph.data.ResultSet; +import com.vesoft.nebula.client.graph.exception.AuthFailedException; import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException; +import com.vesoft.nebula.client.graph.exception.IOErrorException; +import com.vesoft.nebula.client.graph.exception.NotValidConnectionException; +import com.vesoft.nebula.client.graph.net.NebulaPool; +import com.vesoft.nebula.client.graph.net.Session; import com.vesoft.nebula.client.meta.MetaClient; import com.vesoft.nebula.client.meta.exception.ExecuteFailedException; import com.vesoft.nebula.meta.ColumnDef; @@ -29,9 +37,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.commons.collections.map.HashedMap; +import org.apache.flink.connector.nebula.connection.NebulaClientOptions; +import org.apache.flink.connector.nebula.connection.NebulaGraphConnectionProvider; +import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; import org.apache.flink.connector.nebula.utils.DataTypeEnum; import org.apache.flink.connector.nebula.utils.NebulaConstant; +import org.apache.flink.connector.nebula.utils.NebulaSpace; +import org.apache.flink.connector.nebula.utils.NebulaSpaces; import org.apache.flink.connector.nebula.utils.NebulaUtils; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; @@ -47,17 +62,69 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + public class NebulaCatalog extends AbstractNebulaCatalog { private static final Logger LOG = LoggerFactory.getLogger(NebulaCatalog.class); - private final List hostAndPorts; - private final MetaClient metaClient; - - public NebulaCatalog(String catalogName, String defaultDatabase, String username, String pwd, - String address) throws UnknownHostException { - super(catalogName, defaultDatabase, username, pwd, address); - this.hostAndPorts = NebulaUtils.getHostAndPorts(address); - this.metaClient = new MetaClient(hostAndPorts); + private final NebulaClientOptions nebulaClientOptions; + private MetaClient metaClient; + private NebulaPool nebulaPool; + private Session session; + + public NebulaCatalog( + String catalogName, + @Nullable String defaultDatabase, + String username, + String password, + String metaAddress, + String graphAddress) { + super(catalogName, defaultDatabase, username, password, metaAddress); + nebulaClientOptions = + new NebulaClientOptions.NebulaClientOptionsBuilder() + .setGraphAddress(graphAddress) + .setMetaAddress(metaAddress) + .setUsername(username) + .setPassword(password) + .build(); + } + + @Override + public void open() throws CatalogException { + super.open(); + NebulaGraphConnectionProvider graphConnectionProvider = + new NebulaGraphConnectionProvider(nebulaClientOptions); + NebulaMetaConnectionProvider metaConnectionProvider = + new NebulaMetaConnectionProvider(nebulaClientOptions); + try { + this.metaClient = metaConnectionProvider.getMetaClient(); + } catch (UnknownHostException | ClientServerIncompatibleException e) { + LOG.error("nebula get meta client error, ", e); + throw new CatalogException("nebula get meta client error.", e); + } + + try { + nebulaPool = graphConnectionProvider.getNebulaPool(); + session = nebulaPool.getSession(graphConnectionProvider.getUserName(), + graphConnectionProvider.getPassword(), true); + } catch (NotValidConnectionException | IOErrorException | AuthFailedException + | ClientServerIncompatibleException | UnknownHostException e) { + LOG.error("failed to get graph session, ", e); + throw new CatalogException("get graph session error, ", e); + } + } + + @Override + public void close() throws CatalogException { + super.close(); + if (session != null) { + session.release(); + } + if (nebulaPool != null) { + nebulaPool.close(); + } + if (metaClient != null) { + metaClient.close(); + } } @Override @@ -93,11 +160,65 @@ public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistE } } + /** + * @param dataBaseName same as graph space name in nebula graph + * @param catalogDatabase catalog implementation + * @param ignoreIfExists true if contains [if not exists] clause else false + */ + @Override + public void createDatabase(String dataBaseName, + CatalogDatabase catalogDatabase, + boolean ignoreIfExists) + throws CatalogException { + checkArgument( + !isNullOrWhitespaceOnly(dataBaseName), "space name cannot be null or empty."); + checkNotNull(catalogDatabase, "space cannot be null."); + + if (ignoreIfExists && listDatabases().contains(dataBaseName)) { + LOG.info("Repeat to create space, {} already exists, no effect.", dataBaseName); + return; + } + Map properties = catalogDatabase.getProperties(); + Map newProperties = properties.entrySet().stream().collect( + Collectors.toMap( + entry -> entry.getKey().toLowerCase(), + entry -> entry.getValue().toUpperCase() + ) + ); + if (!newProperties.containsKey(NebulaConstant.CREATE_VID_TYPE)) { + LOG.error("failed to create graph space {}, missing VID type param", properties); + throw new CatalogException("nebula create graph space failed, missing VID type."); + } + String vidType = newProperties.get(NebulaConstant.CREATE_VID_TYPE); + if (!NebulaUtils.checkValidVidType(vidType)) { + LOG.error("VID type not satisfy {}", vidType); + throw new CatalogException("nebula graph dont support VID type."); + } + NebulaSpace space = new NebulaSpace( + dataBaseName,catalogDatabase.getComment(), newProperties); + NebulaSpaces nebulaSpaces = new NebulaSpaces(space); + String statement = nebulaSpaces.getCreateStatement(); + ResultSet execResult = null; + try { + execResult = session.execute(statement); + } catch (IOErrorException e) { + LOG.error("nebula create graph space execute failed.", e); + throw new CatalogException("nebula create graph space execute failed."); + } + + if (execResult.isSucceeded()) { + LOG.debug("create space success."); + } else { + LOG.error("create space failed: {}", execResult.getErrorMessage()); + throw new CatalogException("create space failed, " + execResult.getErrorMessage()); + } + } + /** * objectName in tablePath mush start with VERTEX. or EDGE. * * @param tablePath A graphSpace name and label name. - * @return + * @return Table exists or not */ @Override public boolean tableExists(ObjectPath tablePath) throws CatalogException { @@ -115,7 +236,7 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { * show all tags and edges * * @param graphSpace nebula graph space - * @return List of Tag and Edge, tag starts with VERTEX. and edge starts with EDGE. . + * @return List of Tag and Edge, tag starts with VERTEX. and edge starts with EDGE. */ @Override public List listTables(String graphSpace) throws DatabaseNotExistException, diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java b/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java deleted file mode 100644 index 6a21cbd..0000000 --- a/connector/src/main/java/org.apache.flink/connector/nebula/catalog/factory/NebulaDynamicTableSource.java +++ /dev/null @@ -1,71 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -package org.apache.flink.connector.nebula.catalog.factory; - -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.connector.nebula.connection.NebulaMetaConnectionProvider; -import org.apache.flink.connector.nebula.statement.ExecutionOptions; -import org.apache.flink.connector.nebula.table.NebulaRowDataInputFormat; -import org.apache.flink.table.connector.ChangelogMode; -import org.apache.flink.table.connector.source.DynamicTableSource; -import org.apache.flink.table.connector.source.InputFormatProvider; -import org.apache.flink.table.connector.source.LookupTableSource; -import org.apache.flink.table.connector.source.ScanTableSource; -import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -import org.apache.flink.table.data.RowData; - -public class NebulaDynamicTableSource implements ScanTableSource, LookupTableSource, - SupportsProjectionPushDown { - - private final NebulaMetaConnectionProvider metaProvider; - private final ExecutionOptions executionOptions; - - public NebulaDynamicTableSource(NebulaMetaConnectionProvider metaProvider, - ExecutionOptions executionOptions) { - this.metaProvider = metaProvider; - this.executionOptions = executionOptions; - } - - - @Override - public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { - return null; - } - - @Override - public ChangelogMode getChangelogMode() { - return ChangelogMode.insertOnly(); - } - - @Override - public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - return InputFormatProvider.of(getInputFormat(metaProvider)); - } - - @Override - public DynamicTableSource copy() { - return new NebulaDynamicTableSource(metaProvider, executionOptions); - } - - @Override - public String asSummaryString() { - return "Nebula"; - } - - @Override - public boolean supportsNestedProjection() { - return false; - } - - @Override - public void applyProjection(int[][] projectedFields) { - - } - - private InputFormat getInputFormat(NebulaMetaConnectionProvider metaProvider) { - return new NebulaRowDataInputFormat(); - } -} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaCatalogUtils.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaCatalogUtils.java index b78c4d6..f5ff8b7 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaCatalogUtils.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaCatalogUtils.java @@ -5,7 +5,6 @@ package org.apache.flink.connector.nebula.utils; -import java.net.UnknownHostException; import org.apache.flink.connector.nebula.catalog.NebulaCatalog; /** @@ -16,9 +15,14 @@ public class NebulaCatalogUtils { /** * Create catalog instance from given information */ - public static NebulaCatalog createNebulaCatalog(String catalogName, String defaultSpace, - String address, String username, - String password) throws UnknownHostException { - return new NebulaCatalog(catalogName, defaultSpace, username, password, address); + public static NebulaCatalog createNebulaCatalog( + String catalogName, + String defaultSpace, + String username, + String password, + String metaAddress, + String graphAddress) { + return new NebulaCatalog(catalogName, defaultSpace, username, password, + metaAddress, graphAddress); } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java index f28a2a7..34dc67a 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java @@ -24,6 +24,10 @@ public class NebulaConstant { public static String DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s"; public static String EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d"; + // template for create space statement + public static String CREATE_SPACE_TEMPLATE = "CREATE SPACE `%s` (%s)"; + public static String CREATE_SPACE_COMMENT = " COMMENT = '%s'"; + // Delimiter public static String COMMA = ","; public static String SUB_LINE = "_"; @@ -44,4 +48,12 @@ public class NebulaConstant { public static final int DEFAULT_CONNECT_RETRY = 3; public static final int DEFAULT_EXECUTION_RETRY = 3; + // params for create space + public static final String CREATE_VID_TYPE = "vid_type"; + public static final String CREATE_PARTITION_NUM = "partition_num"; + public static final String CREATE_REPLICA_FACTOR = "replica_factor"; + + // default params for create space + public static final int DEFAULT_PARTITION_NUM = 100; + public static final int DEFAULT_REPLICA_FACTOR = 1; } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaSpace.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaSpace.java new file mode 100644 index 0000000..6f532f5 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaSpace.java @@ -0,0 +1,58 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.utils; + +import java.io.Serializable; +import java.util.Map; + +public class NebulaSpace implements Serializable { + + private String spaceName; + private String comment; + private Map props; + + public NebulaSpace() { + } + + public NebulaSpace(String spaceName, String comment, Map props) { + this.spaceName = spaceName; + this.comment = comment; + this.props = props; + } + + public String getSpaceName() { + return spaceName; + } + + public void setSpaceName(String spaceName) { + this.spaceName = spaceName; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public Map getProps() { + return props; + } + + public void setProps(Map props) { + this.props = props; + } + + @Override + public String toString() { + return "NebulaSpace{" + + "spaceName='" + spaceName + '\'' + + ", comment='" + comment + '\'' + + ", props=" + props + + '}'; + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaSpaces.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaSpaces.java new file mode 100644 index 0000000..300ca93 --- /dev/null +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaSpaces.java @@ -0,0 +1,57 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.utils; + +import java.io.Serializable; +import java.util.Map; + +public class NebulaSpaces implements Serializable { + + private final NebulaSpace nebulaSpace; + private final Map props; + + public NebulaSpaces(NebulaSpace nebulaSpace) { + this.nebulaSpace = nebulaSpace; + this.props = nebulaSpace.getProps(); + } + + /** + * construct Nebula create space ngql. + * + * @return ngql + */ + public String getCreateStatement() { + Map props = nebulaSpace.getProps(); + StringBuilder sb = new StringBuilder(); + addParams(sb, NebulaConstant.CREATE_PARTITION_NUM, NebulaConstant.DEFAULT_PARTITION_NUM); + addParams(sb, NebulaConstant.CREATE_REPLICA_FACTOR, NebulaConstant.DEFAULT_REPLICA_FACTOR); + sb.append(NebulaConstant.CREATE_VID_TYPE) + .append(" = ") + .append(props.get(NebulaConstant.CREATE_VID_TYPE)); + + String stat = String.format( + NebulaConstant.CREATE_SPACE_TEMPLATE, + nebulaSpace.getSpaceName(), + sb + ); + + String comment = nebulaSpace.getComment(); + if (comment != null) { + stat += String.format(NebulaConstant.CREATE_SPACE_COMMENT, comment); + } + + return stat; + } + + private void addParams(StringBuilder sb, String para, int defaultValue) { + sb.append(para) + .append(" = ") + .append(props.containsKey(para) + ? Integer.parseInt(props.get(para)) + : defaultValue) + .append(", "); + } +} diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaUtils.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaUtils.java index 902afd2..d21a80b 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaUtils.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaUtils.java @@ -9,6 +9,8 @@ import com.vesoft.nebula.client.graph.data.HostAddress; import java.util.ArrayList; import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class NebulaUtils { @@ -116,4 +118,20 @@ public static String mkString(String value, String start, String sep, String end builder.append(end); return builder.toString(); } + + /** + * Check valid VID definition + * @param vidType vid define string + * @return true if INT | INT64 | FIXED_STRING(n) + */ + public static boolean checkValidVidType(String vidType) { + if ("INT".equals(vidType) || "INT64".equals(vidType)) { + return true; + } + String regex = "FIXED_STRING\\(\\d+\\)"; + Pattern pattern = Pattern.compile(regex, Pattern.MULTILINE); + Matcher matcher = pattern.matcher(vidType); + + return matcher.matches(); + } } diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/catalog/NebulaCatalogCreateSpaceTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/catalog/NebulaCatalogCreateSpaceTest.java new file mode 100644 index 0000000..c4e75d9 --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/catalog/NebulaCatalogCreateSpaceTest.java @@ -0,0 +1,69 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package org.apache.flink.connector.nebula.catalog; + +import org.apache.flink.connector.nebula.utils.NebulaCatalogUtils; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.junit.Test; + +public class NebulaCatalogCreateSpaceTest { + + private static final String CATALOG_NAME = "NebulaCatalog"; + private static final String GRAPH_SPACE = "default"; + private static final String USERNAME = "root"; + private static final String PASSWORD = "nebula"; + private static final String META_ADDRESS = "127.0.0.1:9559"; + private static final String GRAPH_ADDRESS = "127.0.0.1:9669"; + + @Test + public void testCreateGraphSpace() { + NebulaCatalog nebulaCatalog = NebulaCatalogUtils.createNebulaCatalog( + CATALOG_NAME, + GRAPH_SPACE, + USERNAME, + PASSWORD, + META_ADDRESS, + GRAPH_ADDRESS + ); + + EnvironmentSettings settings = EnvironmentSettings.newInstance() + .inStreamingMode() + .build(); + TableEnvironment tableEnv = TableEnvironment.create(settings); + + tableEnv.registerCatalog(CATALOG_NAME, nebulaCatalog); + tableEnv.useCatalog(CATALOG_NAME); + + String createDataBase1 = "CREATE DATABASE IF NOT EXISTS `db1`" + + " COMMENT 'space 1'" + + " WITH (" + + " 'partition_num' = '100'," + + " 'replica_factor' = '3'," + + " 'vid_type' = 'FIXED_STRING(10)'" + + ")"; + + String createDataBase2 = "CREATE DATABASE IF NOT EXISTS `db2`" + + " COMMENT 'space 2'" + + " WITH (" + + " 'partition_num' = '10'," + + " 'replica_factor' = '2'," + + " 'vid_type' = 'INT'" + + ")"; + + String createSameDataBase = "CREATE DATABASE IF NOT EXISTS `db1`" + + " COMMENT 'same name as space 1'" + + " WITH (" + + " 'partition_num' = '10'," + + " 'replica_factor' = '2'," + + " 'vid_type' = 'INT64'" + + ")"; + + tableEnv.executeSql(createDataBase1); + tableEnv.executeSql(createDataBase2); + tableEnv.executeSql(createSameDataBase); + } +} diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaUtilsTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaUtilsTest.java index ba22c87..a88eaae 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaUtilsTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaUtilsTest.java @@ -62,4 +62,15 @@ public void testMkString() { assertEquals("\"test\"", NebulaUtils.mkString("test", "\"", "", "\"")); assertEquals("\"t,e,s,t\"", NebulaUtils.mkString("test", "\"", ",", "\"")); } + + public void testCheckValidVidType() { + assertTrue(NebulaUtils.checkValidVidType("INT")); + assertTrue(NebulaUtils.checkValidVidType("INT64")); + assertTrue(NebulaUtils.checkValidVidType("FIXED_STRING(10)")); + + assertFalse(NebulaUtils.checkValidVidType("INT32")); + assertFalse(NebulaUtils.checkValidVidType("FIXED_STRING")); + assertFalse(NebulaUtils.checkValidVidType("FIXED_STRING(-1)")); + assertFalse(NebulaUtils.checkValidVidType("FIXED_STRING(aaa)")); + } }