Skip to content

Commit

Permalink
Supports iceberg sink apache#6198
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 16, 2024
1 parent 1f1b18b commit eb5f773
Show file tree
Hide file tree
Showing 36 changed files with 3,193 additions and 17 deletions.
1 change: 1 addition & 0 deletions plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ seatunnel.sink.Sentry = connector-sentry
seatunnel.source.MongoDB = connector-mongodb
seatunnel.sink.MongoDB = connector-mongodb
seatunnel.source.Iceberg = connector-iceberg
seatunnel.sink.Iceberg = connector-iceberg
seatunnel.source.InfluxDB = connector-influxdb
seatunnel.source.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
<hadoop.binary.version>2.7</hadoop.binary.version>
<jackson.version>2.13.3</jackson.version>
<lombok.version>1.18.24</lombok.version>
<commons-compress.version>1.20</commons-compress.version>
<commons-compress.version>1.22</commons-compress.version>
<skip.pmd.check>false</skip.pmd.check>
<maven.deploy.skip>false</maven.deploy.skip>
<maven.javadoc.skip>false</maven.javadoc.skip>
Expand Down
18 changes: 13 additions & 5 deletions seatunnel-connectors-v2/connector-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<name>SeaTunnel : Connectors V2 : Iceberg</name>

<properties>
<iceberg.version>0.14.0</iceberg.version>
<iceberg.version>1.4.2</iceberg.version>
<parquet-avro.version>1.12.3</parquet-avro.version>
<avro.version>1.11.3</avro.version>
<hive.version>2.3.9</hive.version>
Expand All @@ -48,6 +48,12 @@
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
Expand Down Expand Up @@ -119,13 +125,15 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<version>${project.version}</version>
<classifier>optional</classifier>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>*</artifactId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down Expand Up @@ -186,7 +194,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit4.version}</version>
<version>4.13.2</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.iceberg;

import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;

import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -67,16 +67,15 @@ public void close() throws IOException {
}
}

public static IcebergTableLoader create(SourceConfig sourceConfig) {
public static IcebergTableLoader create(CommonConfig config) {
IcebergCatalogFactory catalogFactory =
new IcebergCatalogFactory(
sourceConfig.getCatalogName(),
sourceConfig.getCatalogType(),
sourceConfig.getWarehouse(),
sourceConfig.getUri());
config.getCatalogName(),
config.getCatalogType(),
config.getWarehouse(),
config.getUri());
return new IcebergTableLoader(
catalogFactory,
TableIdentifier.of(
Namespace.of(sourceConfig.getNamespace()), sourceConfig.getTable()));
TableIdentifier.of(Namespace.of(config.getNamespace()), config.getTable()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.config.ConfigRuntimeException;

import lombok.Getter;
import lombok.ToString;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.stream.Collectors.toList;

@Getter
@ToString
public class SinkConfig {

public static final int SCHEMA_UPDATE_RETRIES = 2; // 3 total attempts
public static final int CREATE_TABLE_RETRIES = 2; // 3 total attempts

private static final String ID_COLUMNS = "id-columns";
private static final String PARTITION_BY = "partition-by";

public static final Option<String> KEY_CATALOG =
Options.key("catalog")
.stringType()
.defaultValue("iceberg")
.withDescription("Iceberg catalog name");

public static final Option<String> KEY_NAMESPACE =
Options.key("namespace")
.stringType()
.noDefaultValue()
.withDescription("The iceberg namespace");

public static final Option<String> KEY_TABLE =
Options.key("table")
.stringType()
.noDefaultValue()
.withDescription("Target iceberg table name.");

public static final Option<Map<String, String>> CATALOG_PROPS =
Options.key("iceberg.catalog.config")
.mapType()
.noDefaultValue()
.withDescription("Iceberg catalog configs");

public static final Option<Map<String, String>> TABLE_PROPS =
Options.key("iceberg.table.config")
.mapType()
.defaultValue(new HashMap<>())
.withDescription("Iceberg table configs");

public static final Option<Map<String, String>> HADOOP_PROPS =
Options.key("hadoop.config")
.mapType()
.defaultValue(new HashMap<>())
.withDescription("hadoop configs");

public static final Option<Map<String, String>> WRITE_PROPS =
Options.key("iceberg.table.write-props")
.mapType()
.defaultValue(new HashMap<>())
.withDescription("Iceberg table write props");

public static final Option<Boolean> TABLES_AUTO_CREATE_ENABLED =
Options.key("iceberg.table.auto-create-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Set to true to automatically create destination table, false otherwise");

public static final Option<Map<String, String>> AUTO_CREATE_PROPS =
Options.key("iceberg.table.auto-create-props")
.mapType()
.defaultValue(new HashMap<>())
.withDescription("Iceberg table auto create props");

public static final Option<String> TABLE_DEFAULT_ID_COLUMNS =
Options.key("iceberg.table.id-columns")
.stringType()
.noDefaultValue()
.withDescription("Default ID columns for tables, comma-separated");

public static final Option<String> TABLE_DEFAULT_PARTITION_BY =
Options.key("iceberg.table.partition-by")
.stringType()
.noDefaultValue()
.withDescription(
"Default partition spec to use when creating tables, comma-separated");

public static final Option<Boolean> TABLE_UPSERT_MODE_ENABLED_PROP =
Options.key("iceberg.table.upsert-mode-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Set to true to treat all appends as upserts, false otherwise");

public static final Option<Boolean> TABLE_EVOLVE_SCHEMA_ENABLED_PROP =
Options.key("iceberg.table.evolve-schema-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Set to true to add any missing record fields to the table schema, false otherwise");

public static final Option<Boolean> TABLE_SCHEMA_CASE_INSENSITIVE_PROP =
Options.key("iceberg.table.schema-case-insensitive")
.booleanType()
.defaultValue(false)
.withDescription(
"Set to true to look up table columns by case-insensitive name, false for case-sensitive");

@VisibleForTesting private static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))";

private final Config config;
private final ReadonlyConfig readonlyConfig;
private final Map<String, String> catalogProps;
private final Map<String, String> hadoopProps;
private final Map<String, String> autoCreateProps;
private final Map<String, String> writeProps;
private final Map<String, TableSinkConfig> tableConfigs = Maps.newHashMap();

public SinkConfig(ReadonlyConfig readonlyConfig) {
this.config = readonlyConfig.toConfig();
this.readonlyConfig = readonlyConfig;
this.catalogProps = readonlyConfig.get(CATALOG_PROPS);
this.hadoopProps = readonlyConfig.get(HADOOP_PROPS);
this.autoCreateProps = readonlyConfig.get(AUTO_CREATE_PROPS);
this.writeProps = readonlyConfig.get(WRITE_PROPS);
validate();
}

private void validate() {
checkState(!catalogProps().isEmpty(), "Must specify iceberg catalog config");
}

private void checkState(boolean condition, String msg) {
if (!condition) {
throw new ConfigRuntimeException(msg);
}
}

public Map<String, String> catalogProps() {
return catalogProps;
}

public Map<String, String> hadoopProps() {
return hadoopProps;
}

public Map<String, String> autoCreateProps() {
return autoCreateProps;
}

public Map<String, String> writeProps() {
return writeProps;
}

public String catalog() {
return config.getString(KEY_CATALOG.key());
}

public String namespace() {
return config.getString(KEY_NAMESPACE.key());
}

public String table() {
return config.getString(KEY_TABLE.key());
}

public String idColumns() {
return config.getString(TABLE_DEFAULT_ID_COLUMNS.key());
}

public String partitionBy() {
return config.getString(TABLE_DEFAULT_PARTITION_BY.key());
}

public TableSinkConfig tableConfig(String tableName) {
return tableConfigs.computeIfAbsent(
tableName,
notUsed -> {
Map<String, String> tableConfig = readonlyConfig.get(TABLE_PROPS);
// define id columns
String idColumnsStr = tableConfig.getOrDefault(ID_COLUMNS, idColumns());
List<String> idColumns = stringToList(idColumnsStr, ",");
// define partition columns
String partitionByStr = tableConfig.getOrDefault(PARTITION_BY, partitionBy());
List<String> partitionBy = stringToList(partitionByStr, COMMA_NO_PARENS_REGEX);
return new TableSinkConfig(idColumns, partitionBy);
});
}

@VisibleForTesting
static List<String> stringToList(String value, String regex) {
if (value == null || value.isEmpty()) {
return ImmutableList.of();
}
return Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
}

public boolean upsertModeEnabled() {
return config.getBoolean(TABLE_UPSERT_MODE_ENABLED_PROP.key());
}

public boolean autoCreateEnabled() {
return config.getBoolean(TABLES_AUTO_CREATE_ENABLED.key());
}

public boolean evolveSchemaEnabled() {
return config.getBoolean(TABLE_EVOLVE_SCHEMA_ENABLED_PROP.key());
}

public boolean schemaCaseInsensitive() {
return config.getBoolean(TABLE_SCHEMA_CASE_INSENSITIVE_PROP.key());
}

public String hadoopConfDir() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import java.util.List;

public class TableSinkConfig {
private final List<String> idColumns;
private final List<String> partitionBy;

public TableSinkConfig(List<String> idColumns, List<String> partitionBy) {
this.idColumns = idColumns;
this.partitionBy = partitionBy;
}

public List<String> idColumns() {
return idColumns;
}

public List<String> partitionBy() {
return partitionBy;
}
}
Loading

0 comments on commit eb5f773

Please sign in to comment.