Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flink sql add create space #66

Merged
merged 6 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.apache.flink.connector.nebula.catalog;

import static org.apache.flink.util.Preconditions.checkArgument;

import com.facebook.thrift.TException;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.exception.ClientServerIncompatibleException;
Expand All @@ -29,9 +31,11 @@
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* AbstractNebulaCatalog is used to get nebula schema
* flink-nebula catalog doesn't support write operators, such as create/alter
Expand All @@ -43,14 +47,23 @@ public abstract class AbstractNebulaCatalog extends AbstractCatalog {
protected final String username;
protected final String password;
protected final String address;

private static final String DEFAULT_DATABASE = "default";

public AbstractNebulaCatalog(String catalogName, String defaultDatabase, String username,
String pwd, String address) {
super(catalogName, defaultDatabase);
// TODO check arguments
String password, String address) {
super(catalogName, defaultDatabase == null ? DEFAULT_DATABASE : defaultDatabase);
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(username),
"username cannot be null or empty.");
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(password),
"password cannot be null or empty.");
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(address),
"address cannot be null or empty."
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the todo work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my pleasure.

this.username = username;
this.password = pwd;
this.password = password;
this.address = address;
}

Expand Down Expand Up @@ -100,59 +113,64 @@ public Optional<Factory> getFactory() {
* operators for nebula graph
*/
@Override
public boolean databaseExists(String graphName) throws CatalogException {
return listDatabases().contains(graphName);
public boolean databaseExists(String dataBaseName) throws CatalogException {
return listDatabases().contains(dataBaseName);
}

@Override
public void createDatabase(String s, CatalogDatabase catalogDatabase, boolean b)
throws CatalogException {
public void createDatabase(String dataBaseName,
CatalogDatabase catalogDatabase,
boolean ignoreIfExists) throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
public void dropDatabase(String dataBaseName, boolean ignoreIfNotExists)
throws CatalogException {
dropDatabase(dataBaseName, ignoreIfNotExists, false);
}

@Override
public void dropDatabase(String s, boolean b, boolean b1) throws CatalogException {
public void dropDatabase(String dataBaseName, boolean ignoreIfNotExists, boolean cascade)
throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void alterDatabase(String s, CatalogDatabase catalogDatabase, boolean b)
throws CatalogException {
public void alterDatabase(String dataBaseName, CatalogDatabase newDatabase,
boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
}

/**
* operator for nebula graph tag and edge, parameter should be graphName.tag or graphName.edge
* operator for nebula graph tag and edge, parameter should be databaseName.tag or
* databaseName.edge
*/
@Override
public List<String> listViews(String graphName) throws CatalogException {
public List<String> listViews(String databaseName) throws CatalogException {
return Collections.emptyList();
}


@Override
public void dropTable(ObjectPath objectPath, boolean b) throws CatalogException {
public void dropTable(ObjectPath tablePath,boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void renameTable(ObjectPath objectPath, String s, boolean b) throws CatalogException {
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void createTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b)
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable, boolean b)
public void alterTable(
ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
throws CatalogException {
throw new UnsupportedOperationException();
}
Expand All @@ -161,152 +179,163 @@ public void alterTable(ObjectPath objectPath, CatalogBaseTable catalogBaseTable,
* operator for partition
*/
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath)
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws CatalogException {
return Collections.emptyList();
}

@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec)
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
throws CatalogException {
return Collections.emptyList();
}

@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath objectPath,
List<Expression> list)
public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath,
List<Expression> filters)
throws CatalogException {
return Collections.emptyList();
}

@Override
public CatalogPartition getPartition(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec)
public CatalogPartition getPartition(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public boolean partitionExists(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec)
public boolean partitionExists(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
throws CatalogException {
return false;
}

@Override
public void createPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec,
CatalogPartition catalogPartition, boolean b)
throws CatalogException {
public void createPartition(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogPartition partition,
boolean ignoreIfExists) throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void dropPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec,
boolean b) throws CatalogException {
public void dropPartition(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void alterPartition(ObjectPath objectPath, CatalogPartitionSpec catalogPartitionSpec,
CatalogPartition catalogPartition, boolean b)
throws CatalogException {
public void alterPartition(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogPartition newPartition,
boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
}

/**
* operator for function
*/
@Override
public List<String> listFunctions(String s) throws CatalogException {
public List<String> listFunctions(String dbName) throws CatalogException {
return Collections.emptyList();
}

@Override
public CatalogFunction getFunction(ObjectPath objectPath) throws FunctionNotExistException,
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException,
CatalogException {
throw new FunctionNotExistException(getName(), objectPath);
throw new FunctionNotExistException(getName(), functionPath);
}

@Override
public boolean functionExists(ObjectPath objectPath) throws CatalogException {
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
return false;
}

@Override
public void createFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean b)
public void createFunction(
ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void alterFunction(ObjectPath objectPath, CatalogFunction catalogFunction, boolean b)
public void alterFunction(
ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void dropFunction(ObjectPath objectPath, boolean b) throws CatalogException {

public void dropFunction(
ObjectPath functionPath, boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
}

/**
* operator for stat
*/
@Override
public CatalogTableStatistics getTableStatistics(ObjectPath objectPath)
throws CatalogException {
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException {
return CatalogTableStatistics.UNKNOWN;
}

@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath objectPath)
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
throws CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}

@Override
public CatalogTableStatistics getPartitionStatistics(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec)
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
throws CatalogException {
return CatalogTableStatistics.UNKNOWN;
}

@Override
public CatalogColumnStatistics getPartitionColumnStatistics(
ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec)
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec)
throws CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}

@Override
public void alterTableStatistics(ObjectPath objectPath,
CatalogTableStatistics catalogTableStatistics, boolean b)
throws CatalogException {
public void alterTableStatistics(
ObjectPath tablePath,
CatalogTableStatistics tableStatistics,
boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void alterTableColumnStatistics(ObjectPath objectPath,
CatalogColumnStatistics catalogColumnStatistics,
boolean b) throws CatalogException {
public void alterTableColumnStatistics(
ObjectPath tablePath,
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void alterPartitionStatistics(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec,
CatalogTableStatistics catalogTableStatistics,
boolean b) throws CatalogException {
public void alterPartitionStatistics(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogTableStatistics partitionStatistics,
boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
}

@Override
public void alterPartitionColumnStatistics(ObjectPath objectPath,
CatalogPartitionSpec catalogPartitionSpec,
CatalogColumnStatistics catalogColumnStatistics,
boolean b) throws CatalogException {
public void alterPartitionColumnStatistics(
ObjectPath tablePath,
CatalogPartitionSpec partitionSpec,
CatalogColumnStatistics columnStatistics,
boolean ignoreIfNotExists) throws CatalogException {
throw new UnsupportedOperationException();
}
}
Loading