Skip to content

Commit

Permalink
[FLINK-36034][cdc-runtime] Get rid of Flink table planner dependency …
Browse files Browse the repository at this point in the history
…in cdc runtime module

This closes apache#3513.
  • Loading branch information
yuxiqian authored and leonardBang committed Aug 12, 2024
1 parent 40adeb3 commit 4f7858d
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 70 deletions.
28 changes: 0 additions & 28 deletions flink-cdc-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,34 +44,6 @@ limitations under the License.
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>value</artifactId>
<groupId>org.immutables</groupId>
</exclusion>
<exclusion>
<artifactId>value-annotations</artifactId>
<groupId>org.immutables</groupId>
</exclusion>
<exclusion>
<artifactId>commons-compiler</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
<exclusion>
<artifactId>janino</artifactId>
<groupId>org.codehaus.janino</groupId>
</exclusion>
<exclusion>
<artifactId>flink-scala_2.12</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.cdc.runtime.functions;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;

import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
Expand All @@ -31,28 +30,17 @@

import javax.annotation.Nullable;

import java.util.Optional;
import java.util.function.Function;

import static org.apache.flink.table.functions.BuiltInFunctionDefinition.DEFAULT_VERSION;
import static org.apache.flink.table.functions.BuiltInFunctionDefinition.qualifyFunctionName;
import static org.apache.flink.table.functions.BuiltInFunctionDefinition.validateFunction;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

/**
* This is the case when the operator has a special parsing syntax or uses other Calcite-specific
* features that are not exposed via {@link BuiltInFunctionDefinition} yet.
*
* <p>Note: Try to keep usages of this class to a minimum and use Flink's {@link
* BuiltInFunctionDefinition} stack instead.
*
* <p>For simple functions, use the provided builder. Otherwise, this class can also be extended.
* features that are not exposed via {@link SqlFunction} yet.
*/
@Internal
public class BuiltInScalarFunction extends SqlFunction {

private final @Nullable Integer version;

private final boolean isDeterministic;

private final boolean isInternal;
Expand All @@ -61,7 +49,6 @@ public class BuiltInScalarFunction extends SqlFunction {

protected BuiltInScalarFunction(
String name,
int version,
SqlKind kind,
@Nullable SqlReturnTypeInference returnTypeInference,
@Nullable SqlOperandTypeInference operandTypeInference,
Expand All @@ -77,11 +64,9 @@ protected BuiltInScalarFunction(
operandTypeInference,
operandTypeChecker,
checkNotNull(category));
this.version = isInternal ? null : version;
this.isDeterministic = isDeterministic;
this.isInternal = isInternal;
this.monotonicity = monotonicity;
validateFunction(name, version, isInternal);
}

protected BuiltInScalarFunction(
Expand All @@ -93,7 +78,6 @@ protected BuiltInScalarFunction(
SqlFunctionCategory category) {
this(
name,
DEFAULT_VERSION,
kind,
returnTypeInference,
operandTypeInference,
Expand All @@ -109,18 +93,6 @@ public static Builder newBuilder() {
return new Builder();
}

public final Optional<Integer> getVersion() {
return Optional.ofNullable(version);
}

public String getQualifiedName() {
if (isInternal) {
return getName();
}
assert version != null;
return qualifyFunctionName(getName(), version);
}

@Override
public boolean isDeterministic() {
return isDeterministic;
Expand All @@ -144,8 +116,6 @@ public static class Builder {

private String name;

private int version = DEFAULT_VERSION;

private SqlKind kind = SqlKind.OTHER_FUNCTION;

private SqlReturnTypeInference returnTypeInference;
Expand All @@ -163,18 +133,11 @@ public static class Builder {
private Function<SqlOperatorBinding, SqlMonotonicity> monotonicity =
call -> SqlMonotonicity.NOT_MONOTONIC;

/** @see BuiltInFunctionDefinition.Builder#name(String) */
public Builder name(String name) {
this.name = name;
return this;
}

/** @see BuiltInFunctionDefinition.Builder#version(int) */
public Builder version(int version) {
this.version = version;
return this;
}

public Builder kind(SqlKind kind) {
this.kind = kind;
return this;
Expand Down Expand Up @@ -205,7 +168,6 @@ public Builder notDeterministic() {
return this;
}

/** @see BuiltInFunctionDefinition.Builder#internal() */
public Builder internal() {
this.isInternal = true;
return this;
Expand All @@ -224,7 +186,6 @@ public Builder monotonicity(Function<SqlOperatorBinding, SqlMonotonicity> monoto
public BuiltInScalarFunction build() {
return new BuiltInScalarFunction(
name,
version,
kind,
returnTypeInference,
operandTypeInference,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.cdc.runtime.operators.schema;

import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableMap;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
Expand All @@ -41,6 +40,7 @@
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;

import org.apache.commons.collections.ListUtils;
Expand Down

0 comments on commit 4f7858d

Please sign in to comment.