Skip to content

Commit

Permalink
[FLINK-35991][cdc-runtime] Resolve operator conflicts in transform SQ…
Browse files Browse the repository at this point in the history
…L operator tables
  • Loading branch information
yuxiqian authored and leonardBang committed Aug 12, 2024
1 parent 5284df5 commit 40adeb3
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,15 @@
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlOperatorTable;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.util.ListSqlOperatorTable;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlConformanceEnum;
import org.apache.calcite.sql.validate.SqlValidator;
Expand Down Expand Up @@ -156,13 +155,10 @@ private static RelNode sqlToRel(
factory,
new CalciteConnectionConfigImpl(new Properties()));
TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
ListSqlOperatorTable udfOperatorTable = new ListSqlOperatorTable();
udfFunctions.forEach(udfOperatorTable::add);
SqlOperatorTable udfOperatorTable = SqlOperatorTables.of(udfFunctions);
SqlValidator validator =
SqlValidatorUtil.newValidator(
SqlOperatorTables.chain(
sqlStdOperatorTable, transformSqlOperatorTable, udfOperatorTable),
SqlOperatorTables.chain(transformSqlOperatorTable, udfOperatorTable),
calciteCatalogReader,
factory,
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
import org.apache.flink.cdc.runtime.functions.BuiltInScalarFunction;
import org.apache.flink.cdc.runtime.functions.BuiltInTimestampFunction;

import org.apache.calcite.sql.SqlBinaryOperator;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlPostfixOperator;
import org.apache.calcite.sql.SqlPrefixOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlCurrentDateFunction;
import org.apache.calcite.sql.fun.SqlBetweenOperator;
import org.apache.calcite.sql.fun.SqlCaseOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.InferTypes;
import org.apache.calcite.sql.type.OperandTypes;
Expand Down Expand Up @@ -75,6 +80,78 @@ public void lookupOperatorOverloads(
SqlNameMatchers.withCaseSensitive(false));
}

// The following binary functions are sorted in documentation definitions. See
// https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/core-concept/transform/ for a
// full list of CDC supported built-in functions.

// --------------------
// Comparison Functions
// --------------------
public static final SqlBinaryOperator EQUALS = SqlStdOperatorTable.EQUALS;
public static final SqlBinaryOperator NOT_EQUALS = SqlStdOperatorTable.NOT_EQUALS;
public static final SqlBinaryOperator GREATER_THAN = SqlStdOperatorTable.GREATER_THAN;
public static final SqlBinaryOperator GREATER_THAN_OR_EQUAL =
SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
public static final SqlBinaryOperator LESS_THAN = SqlStdOperatorTable.LESS_THAN;
public static final SqlBinaryOperator LESS_THAN_OR_EQUAL =
SqlStdOperatorTable.LESS_THAN_OR_EQUAL;

public static final SqlPostfixOperator IS_NULL = SqlStdOperatorTable.IS_NULL;
public static final SqlPostfixOperator IS_NOT_NULL = SqlStdOperatorTable.IS_NOT_NULL;

public static final SqlBetweenOperator BETWEEN = SqlStdOperatorTable.BETWEEN;
public static final SqlBetweenOperator NOT_BETWEEN = SqlStdOperatorTable.NOT_BETWEEN;

public static final SqlSpecialOperator LIKE = SqlStdOperatorTable.LIKE;
public static final SqlSpecialOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE;

public static final SqlBinaryOperator IN = SqlStdOperatorTable.IN;
public static final SqlBinaryOperator NOT_IN = SqlStdOperatorTable.NOT_IN;

// -----------------
// Logical Functions
// -----------------
public static final SqlBinaryOperator OR = SqlStdOperatorTable.OR;
public static final SqlBinaryOperator AND = SqlStdOperatorTable.AND;
public static final SqlPrefixOperator NOT = SqlStdOperatorTable.NOT;

public static final SqlPostfixOperator IS_FALSE = SqlStdOperatorTable.IS_FALSE;
public static final SqlPostfixOperator IS_NOT_FALSE = SqlStdOperatorTable.IS_NOT_FALSE;
public static final SqlPostfixOperator IS_TRUE = SqlStdOperatorTable.IS_TRUE;
public static final SqlPostfixOperator IS_NOT_TRUE = SqlStdOperatorTable.IS_NOT_TRUE;

// --------------------
// Arithmetic Functions
// --------------------
public static final SqlBinaryOperator PLUS = SqlStdOperatorTable.PLUS;
public static final SqlBinaryOperator MINUS = SqlStdOperatorTable.MINUS;
public static final SqlBinaryOperator MULTIPLY = SqlStdOperatorTable.MULTIPLY;
public static final SqlBinaryOperator DIVIDE = SqlStdOperatorTable.DIVIDE;
public static final SqlBinaryOperator PERCENT_REMAINDER = SqlStdOperatorTable.PERCENT_REMAINDER;

public static final SqlFunction ABS = SqlStdOperatorTable.ABS;
public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
public static final SqlFunction ROUND =
new SqlFunction(
"ROUND",
SqlKind.OTHER_FUNCTION,
TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE,
null,
OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC),
SqlFunctionCategory.NUMERIC);
public static final SqlFunction UUID =
BuiltInScalarFunction.newBuilder()
.name("UUID")
.returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36))
.operandTypeChecker(OperandTypes.NILADIC)
.notDeterministic()
.build();

// ----------------
// String Functions
// ----------------
public static final SqlBinaryOperator CONCAT = SqlStdOperatorTable.CONCAT;
public static final SqlFunction CONCAT_FUNCTION =
BuiltInScalarFunction.newBuilder()
.name("CONCAT")
Expand All @@ -85,45 +162,65 @@ public void lookupOperatorOverloads(
.operandTypeChecker(
OperandTypes.repeat(SqlOperandCountRanges.from(1), OperandTypes.STRING))
.build();

public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH;
public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
public static final SqlFunction REGEXP_REPLACE =
new SqlFunction(
"REGEXP_REPLACE",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(
ReturnTypes.explicit(SqlTypeName.VARCHAR),
SqlTypeTransforms.TO_NULLABLE),
null,
OperandTypes.family(
SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
SqlFunctionCategory.STRING);
public static final SqlFunction SUBSTR =
new SqlFunction(
"SUBSTR",
SqlKind.OTHER_FUNCTION,
TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE,
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER),
OperandTypes.family(
SqlTypeFamily.CHARACTER,
SqlTypeFamily.INTEGER,
SqlTypeFamily.INTEGER)),
SqlFunctionCategory.STRING);

// ------------------
// Temporal Functions
// ------------------
public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
public static final SqlFunction LOCALTIMESTAMP =
new BuiltInTimestampFunction("LOCALTIMESTAMP", SqlTypeName.TIMESTAMP, 3);
public static final SqlFunction CURRENT_TIME =
new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0);
public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE;
public static final SqlFunction CURRENT_TIMESTAMP =
new BuiltInTimestampFunction(
"CURRENT_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
public static final SqlFunction CURRENT_DATE = new SqlCurrentDateFunction();
public static final SqlFunction NOW =
new BuiltInTimestampFunction("NOW", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {
@Override
public SqlSyntax getSyntax() {
return SqlSyntax.FUNCTION;
}
};
public static final SqlFunction TO_DATE =
public static final SqlFunction DATE_FORMAT =
new SqlFunction(
"TO_DATE",
"DATE_FORMAT",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(
ReturnTypes.explicit(SqlTypeName.DATE),
SqlTypeTransforms.FORCE_NULLABLE),
null,
TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
InferTypes.RETURN_TYPE,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.STRING),
OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING),
OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction TO_TIMESTAMP =
new SqlFunction(
"TO_TIMESTAMP",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(
ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3),
SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.CHARACTER),
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction TIMESTAMP_DIFF =
new SqlFunction(
"TIMESTAMP_DIFF",
Expand All @@ -135,64 +232,36 @@ public SqlSyntax getSyntax() {
OperandTypes.family(
SqlTypeFamily.ANY, SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction REGEXP_REPLACE =
public static final SqlFunction TO_DATE =
new SqlFunction(
"REGEXP_REPLACE",
"TO_DATE",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(
ReturnTypes.explicit(SqlTypeName.VARCHAR),
SqlTypeTransforms.TO_NULLABLE),
null,
OperandTypes.family(
SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING),
SqlFunctionCategory.STRING);
public static final SqlFunction SUBSTR =
new SqlFunction(
"SUBSTR",
SqlKind.OTHER_FUNCTION,
TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE,
ReturnTypes.explicit(SqlTypeName.DATE),
SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER),
OperandTypes.family(
SqlTypeFamily.CHARACTER,
SqlTypeFamily.INTEGER,
SqlTypeFamily.INTEGER)),
SqlFunctionCategory.STRING);
public static final SqlFunction ROUND =
OperandTypes.family(SqlTypeFamily.STRING),
OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
SqlFunctionCategory.TIMEDATE);
public static final SqlFunction TO_TIMESTAMP =
new SqlFunction(
"ROUND",
"TO_TIMESTAMP",
SqlKind.OTHER_FUNCTION,
TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE,
ReturnTypes.cascade(
ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3),
SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC),
SqlFunctionCategory.NUMERIC);
public static final SqlFunction UUID =
BuiltInScalarFunction.newBuilder()
.name("UUID")
.returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36))
.operandTypeChecker(OperandTypes.NILADIC)
.notDeterministic()
.build();
public static final SqlFunction MOD = SqlStdOperatorTable.MOD;
public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR;
public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
public static final SqlFunction MONTH = SqlStdOperatorTable.MONTH;
public static final SqlFunction WEEK = SqlStdOperatorTable.WEEK;
public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD;
public static final SqlOperator BETWEEN = SqlStdOperatorTable.BETWEEN;
public static final SqlOperator SYMMETRIC_BETWEEN = SqlStdOperatorTable.SYMMETRIC_BETWEEN;
public static final SqlOperator NOT_BETWEEN = SqlStdOperatorTable.NOT_BETWEEN;
public static final SqlOperator IN = SqlStdOperatorTable.IN;
public static final SqlOperator NOT_IN = SqlStdOperatorTable.NOT_IN;
public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH;
public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
public static final SqlOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE;
public static final SqlOperator LIKE = SqlStdOperatorTable.LIKE;
public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
public static final SqlFunction ABS = SqlStdOperatorTable.ABS;
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.CHARACTER),
OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)),
SqlFunctionCategory.TIMEDATE);

// ---------------------
// Conditional Functions
// ---------------------
public static final SqlCaseOperator CASE = SqlStdOperatorTable.CASE;
public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE;
public static final SqlFunction IF =
new SqlFunction(
"IF",
Expand Down Expand Up @@ -235,17 +304,9 @@ public SqlSyntax getSyntax() {
OperandTypes.family(
SqlTypeFamily.BOOLEAN, SqlTypeFamily.TIME, SqlTypeFamily.TIME)),
SqlFunctionCategory.NUMERIC);
public static final SqlFunction NULLIF = SqlStdOperatorTable.NULLIF;
public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
public static final SqlFunction DATE_FORMAT =
new SqlFunction(
"DATE_FORMAT",
SqlKind.OTHER_FUNCTION,
TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
InferTypes.RETURN_TYPE,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING),
OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)),
SqlFunctionCategory.TIMEDATE);

// --------------
// Cast Functions
// --------------
public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSelect;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.sql2rel.RelDecorrelator;
Expand Down Expand Up @@ -101,10 +99,9 @@ public void testTransformCalciteValidate() {
factory,
new CalciteConnectionConfigImpl(new Properties()));
TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
SqlValidator validator =
SqlValidatorUtil.newValidator(
SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable),
transformSqlOperatorTable,
calciteCatalogReader,
factory,
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
Expand Down Expand Up @@ -144,10 +141,9 @@ public void testCalciteRelNode() {
factory,
new CalciteConnectionConfigImpl(new Properties()));
TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance();
SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
SqlValidator validator =
SqlValidatorUtil.newValidator(
SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable),
transformSqlOperatorTable,
calciteCatalogReader,
factory,
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
Expand Down

0 comments on commit 40adeb3

Please sign in to comment.