Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 12, 2024
1 parent 1f1b18b commit 3ac7eb9
Show file tree
Hide file tree
Showing 25 changed files with 2,735 additions and 0 deletions.
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>iceberg-orc</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.config.ConfigRuntimeException;

import lombok.Getter;
import lombok.ToString;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static java.util.stream.Collectors.toList;

@Getter
@ToString
public class SinkConfig {

public static final int SCHEMA_UPDATE_RETRIES = 2; // 3 total attempts
public static final int CREATE_TABLE_RETRIES = 2; // 3 total attempts

private static final String ID_COLUMNS = "id-columns";
private static final String PARTITION_BY = "partition-by";

public static final Option<String> KEY_CATALOG =
Options.key("catalog")
.stringType()
.defaultValue("iceberg")
.withDescription("Iceberg catalog name");

public static final Option<String> KEY_NAMESPACE =
Options.key("namespace")
.stringType()
.noDefaultValue()
.withDescription("The iceberg namespace");

public static final Option<String> KEY_TABLE =
Options.key("table")
.stringType()
.noDefaultValue()
.withDescription("Target iceberg table name.");

public static final Option<Map<String, String>> CATALOG_PROPS =
Options.key("iceberg.catalog.config")
.mapType()
.noDefaultValue()
.withDescription("Iceberg catalog configs");

public static final Option<Map<String, String>> TABLE_PROPS =
Options.key("iceberg.table.config")
.mapType()
.noDefaultValue()
.withDescription("Iceberg table configs");

public static final Option<Map<String, String>> HADOOP_PROPS =
Options.key("hadoop.config")
.mapType()
.noDefaultValue()
.withDescription("hadoop configs");

public static final Option<Map<String, String>> WRITE_PROPS =
Options.key("iceberg.table.write-props")
.mapType()
.noDefaultValue()
.withDescription("Iceberg table write props");

public static final Option<Boolean> TABLES_AUTO_CREATE_ENABLED =
Options.key("iceberg.tables.auto-create-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Set to true to automatically create destination tables, false otherwise");

public static final Option<Map<String, String>> AUTO_CREATE_PROPS =
Options.key("iceberg.tables.auto-create-props")
.mapType()
.noDefaultValue()
.withDescription("Iceberg tables auto create props");

public static final Option<String> TABLES_DEFAULT_ID_COLUMNS =
Options.key("iceberg.tables.default-id-columns")
.stringType()
.noDefaultValue()
.withDescription("Default ID columns for tables, comma-separated");

public static final Option<String> TABLES_DEFAULT_PARTITION_BY =
Options.key("iceberg.tables.default-partition-by")
.stringType()
.noDefaultValue()
.withDescription(
"Default partition spec to use when creating tables, comma-separated");

public static final Option<Boolean> TABLES_UPSERT_MODE_ENABLED_PROP =
Options.key("iceberg.tables.upsert-mode-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Set to true to treat all appends as upserts, false otherwise");

public static final Option<Boolean> TABLES_EVOLVE_SCHEMA_ENABLED_PROP =
Options.key("iceberg.tables.evolve-schema-enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Set to true to add any missing record fields to the table schema, false otherwise");

public static final Option<Boolean> TABLES_SCHEMA_CASE_INSENSITIVE_PROP =
Options.key("iceberg.tables.schema-case-insensitive")
.booleanType()
.defaultValue(false)
.withDescription(
"Set to true to look up table columns by case-insensitive name, false for case-sensitive");

@VisibleForTesting private static final String COMMA_NO_PARENS_REGEX = ",(?![^()]*+\\))";

private final Config config;
private final ReadonlyConfig readonlyConfig;
private final Map<String, String> catalogProps;
private final Map<String, String> hadoopProps;
private final Map<String, String> autoCreateProps;
private final Map<String, String> writeProps;
private final Map<String, TableSinkConfig> tableConfigs = Maps.newHashMap();

public SinkConfig(ReadonlyConfig readonlyConfig) {
this.config = readonlyConfig.toConfig();
this.readonlyConfig = readonlyConfig;
this.catalogProps = readonlyConfig.get(CATALOG_PROPS);
this.hadoopProps = readonlyConfig.get(HADOOP_PROPS);
this.autoCreateProps = readonlyConfig.get(AUTO_CREATE_PROPS);
this.writeProps = readonlyConfig.get(WRITE_PROPS);
validate();
}

private void validate() {
checkState(!catalogProps().isEmpty(), "Must specify iceberg catalog config");
}

private void checkState(boolean condition, String msg) {
if (!condition) {
throw new ConfigRuntimeException(msg);
}
}

public Map<String, String> catalogProps() {
return catalogProps;
}

public Map<String, String> hadoopProps() {
return hadoopProps;
}

public Map<String, String> autoCreateProps() {
return autoCreateProps;
}

public Map<String, String> writeProps() {
return writeProps;
}

public String catalog() {
return config.getString(KEY_CATALOG.key());
}

public String namespace() {
return config.getString(KEY_NAMESPACE.key());
}

public String table() {
return config.getString(KEY_TABLE.key());
}

public String idColumns() {
return config.getString(TABLES_DEFAULT_ID_COLUMNS.key());
}

public String partitionBy() {
return config.getString(TABLES_DEFAULT_PARTITION_BY.key());
}

public TableSinkConfig tableConfig(String tableName) {
return tableConfigs.computeIfAbsent(
tableName,
notUsed -> {
Map<String, String> tableConfig = readonlyConfig.get(TABLE_PROPS);
// define id columns
String idColumnsStr = tableConfig.getOrDefault(ID_COLUMNS, idColumns());
List<String> idColumns = stringToList(idColumnsStr, ",");
// define partition columns
String partitionByStr = tableConfig.getOrDefault(PARTITION_BY, partitionBy());
List<String> partitionBy = stringToList(partitionByStr, COMMA_NO_PARENS_REGEX);
return new TableSinkConfig(idColumns, partitionBy);
});
}

@VisibleForTesting
static List<String> stringToList(String value, String regex) {
if (value == null || value.isEmpty()) {
return ImmutableList.of();
}
return Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
}

public boolean upsertModeEnabled() {
return config.getBoolean(TABLES_UPSERT_MODE_ENABLED_PROP.key());
}

public boolean autoCreateEnabled() {
return config.getBoolean(TABLES_AUTO_CREATE_ENABLED.key());
}

public boolean evolveSchemaEnabled() {
return config.getBoolean(TABLES_EVOLVE_SCHEMA_ENABLED_PROP.key());
}

public boolean schemaCaseInsensitive() {
return config.getBoolean(TABLES_SCHEMA_CASE_INSENSITIVE_PROP.key());
}

public String hadoopConfDir() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import java.util.List;

public class TableSinkConfig {
private final List<String> idColumns;
private final List<String> partitionBy;

public TableSinkConfig(List<String> idColumns, List<String> partitionBy) {
this.idColumns = idColumns;
this.partitionBy = partitionBy;
}

public List<String> idColumns() {
return idColumns;
}

public List<String> partitionBy() {
return partitionBy;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.sink;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.commit.IcebergCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.iceberg.sink.state.IcebergSinkState;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.Utilities;

import org.apache.iceberg.catalog.Catalog;

import com.google.auto.service.AutoService;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

@AutoService(SeaTunnelSink.class)
public class IcebergSink
implements SeaTunnelSink<
SeaTunnelRow, IcebergSinkState, IcebergCommitInfo, IcebergAggregatedCommitInfo> {
private static String PLUGIN_NAME = "Iceberg";
private SinkConfig sinkConfig;
private SeaTunnelRowType seaTunnelRowType;
private Catalog catalog;

public IcebergSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
this.sinkConfig = new SinkConfig(pluginConfig);
this.seaTunnelRowType = rowType;
this.catalog = Utilities.loadCatalog(sinkConfig);
}

@Override
public String getPluginName() {
return PLUGIN_NAME;
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public SinkWriter<SeaTunnelRow, IcebergCommitInfo, IcebergSinkState> createWriter(
SinkWriter.Context context) throws IOException {
return IcebergSinkWriter.of(catalog, sinkConfig, seaTunnelRowType, null);
}

@Override
public SinkWriter<SeaTunnelRow, IcebergCommitInfo, IcebergSinkState> restoreWriter(
SinkWriter.Context context, List<IcebergSinkState> states) throws IOException {
return IcebergSinkWriter.of(catalog, sinkConfig, seaTunnelRowType, states);
}

@Override
public Optional<SinkAggregatedCommitter<IcebergCommitInfo, IcebergAggregatedCommitInfo>>
createAggregatedCommitter() throws IOException {
return Optional.of(new IcebergAggregatedCommitter(catalog, sinkConfig));
}

@Override
public Optional<Serializer<IcebergAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}

@Override
public Optional<Serializer<IcebergCommitInfo>> getCommitInfoSerializer() {
return Optional.of(new DefaultSerializer<>());
}
}
Loading

0 comments on commit 3ac7eb9

Please sign in to comment.