From e162b3bd717af6945dad56b931c6679c4fbaaada Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Sun, 18 Aug 2024 19:59:44 -0500 Subject: [PATCH] Refactor config classes --- .../xtable/conversion/ConversionConfig.java | 50 +++ .../xtable/conversion/ExternalTable.java | 77 +++++ ...HudiSourceConfig.java => SourceTable.java} | 27 +- .../apache/xtable/conversion/TargetTable.java | 47 +++ .../xtable/spi/sync/ConversionTarget.java | 4 +- .../xtable/conversion/TestExternalTable.java | 56 ++++ .../xtable/conversion/TestSourceTable.java | 47 +++ .../xtable/conversion/TestTargetTable.java} | 30 +- .../conversion/ConversionController.java | 25 +- .../conversion/ConversionSourceProvider.java | 13 +- .../conversion/ConversionTargetFactory.java | 16 +- .../xtable/conversion/PerTableConfigImpl.java | 142 -------- .../delta/DeltaConversionSourceProvider.java | 10 +- .../xtable/delta/DeltaConversionTarget.java | 28 +- ...figurationBasedPartitionSpecExtractor.java | 4 +- .../hudi/HudiConversionSourceProvider.java | 11 +- .../xtable/hudi/HudiConversionTarget.java | 20 +- ...eConfigImpl.java => HudiSourceConfig.java} | 32 +- .../iceberg/IcebergConversionSource.java | 33 +- .../IcebergConversionSourceProvider.java | 4 +- .../iceberg/IcebergConversionTarget.java | 22 +- .../apache/xtable/ITConversionController.java | 310 ++++++++++-------- .../conversion/TestConversionConfig.java | 66 ++++ .../conversion/TestConversionController.java | 106 +++--- .../TestConversionTargetFactory.java | 37 +-- .../xtable/conversion/TestPerTableConfig.java | 118 ------- .../delta/ITDeltaConversionTargetSource.java | 95 +++--- .../apache/xtable/delta/TestDeltaSync.java | 14 +- .../hudi/ITHudiConversionSourceSource.java | 2 +- .../hudi/ITHudiConversionSourceTarget.java | 13 +- .../ITIcebergConversionTargetSource.java | 57 ++-- .../TestIcebergConversionTargetSource.java | 22 +- .../xtable/iceberg/TestIcebergSync.java | 14 +- .../org/apache/xtable/loadtest/LoadTest.java | 76 +++-- .../xtable/hudi/sync/XTableSyncConfig.java | 4 +- .../xtable/hudi/sync/XTableSyncTool.java | 59 ++-- .../xtable/hudi/sync/TestXTableSyncTool.java | 2 +- .../org/apache/xtable/utilities/RunSync.java | 64 ++-- 38 files changed, 992 insertions(+), 765 deletions(-) create mode 100644 xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java create mode 100644 xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java rename xtable-api/src/main/java/org/apache/xtable/conversion/{HudiSourceConfig.java => SourceTable.java} (53%) create mode 100644 xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java create mode 100644 xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java create mode 100644 xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java rename xtable-api/src/{main/java/org/apache/xtable/conversion/PerTableConfig.java => test/java/org/apache/xtable/conversion/TestTargetTable.java} (65%) delete mode 100644 xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java rename xtable-core/src/main/java/org/apache/xtable/hudi/{HudiSourceConfigImpl.java => HudiSourceConfig.java} (68%) create mode 100644 xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java delete mode 100644 xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java new file mode 100644 index 000000000..73e2628db --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.List; + +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; + +import com.google.common.base.Preconditions; + +import org.apache.xtable.model.sync.SyncMode; + +@Value +public class ConversionConfig { + // The source of the sync + @NonNull SourceTable sourceTable; + // One or more targets to sync the table metadata to + List targetTables; + // The mode, incremental or snapshot + SyncMode syncMode; + + @Builder + ConversionConfig( + @NonNull SourceTable sourceTable, List targetTables, SyncMode syncMode) { + this.sourceTable = sourceTable; + this.targetTables = targetTables; + Preconditions.checkArgument( + targetTables != null && !targetTables.isEmpty(), + "Please provide at-least one format to sync"); + this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java new file mode 100644 index 000000000..939c59c09 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.util.Properties; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; + +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; + +/** Defines a reference to a table in a particular format. */ +@Getter +@EqualsAndHashCode +class ExternalTable { + /** The name of the table. */ + protected final @NonNull String name; + /** The format of the table (e.g. DELTA, ICEBERG, HUDI) */ + protected final @NonNull String formatName; + /** The path to the root of the table or the metadata directory depending on the format */ + protected final @NonNull String basePath; + /** Optional namespace for the table */ + protected final String[] namespace; + /** The configuration for interacting with the catalog that manages this table */ + protected final CatalogConfig catalogConfig; + + /** Optional, additional properties that can be used to define interactions with the table */ + protected final Properties additionalProperties; + + ExternalTable( + @NonNull String name, + @NonNull String formatName, + @NonNull String basePath, + String[] namespace, + CatalogConfig catalogConfig, + Properties additionalProperties) { + this.name = name; + this.formatName = formatName; + this.basePath = sanitizeBasePath(basePath); + this.namespace = namespace; + this.catalogConfig = catalogConfig; + this.additionalProperties = additionalProperties; + } + + protected String sanitizeBasePath(String tableBasePath) { + Path path = new Path(tableBasePath); + Preconditions.checkArgument(path.isAbsolute(), "Table base path must be absolute"); + if (path.isAbsoluteAndSchemeAuthorityNull()) { + // assume this is local file system and append scheme + return "file://" + path; + } else if (path.toUri().getScheme().equals("file")) { + // add extra slashes + return "file://" + path.toUri().getPath(); + } else { + return path.toString(); + } + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java similarity index 53% rename from xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java rename to xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java index 6edf356f7..b37e1c1e2 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/HudiSourceConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/SourceTable.java @@ -18,10 +18,29 @@ package org.apache.xtable.conversion; -import org.apache.xtable.spi.extractor.SourcePartitionSpecExtractor; +import java.util.Properties; -public interface HudiSourceConfig { - String getPartitionSpecExtractorClass(); +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; - SourcePartitionSpecExtractor loadSourcePartitionSpecExtractor(); +@EqualsAndHashCode(callSuper = true) +@Getter +public class SourceTable extends ExternalTable { + /** The path to the data files, defaults to the metadataPath */ + @NonNull private final String dataPath; + + @Builder(toBuilder = true) + public SourceTable( + String name, + String formatName, + String basePath, + String dataPath, + String[] namespace, + CatalogConfig catalogConfig, + Properties additionalProperties) { + super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); + this.dataPath = dataPath == null ? this.getBasePath() : sanitizeBasePath(dataPath); + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java new file mode 100644 index 000000000..6256da2c6 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/TargetTable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Properties; + +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; + +@Getter +@EqualsAndHashCode(callSuper = true) +public class TargetTable extends ExternalTable { + private final Duration metadataRetention; + + @Builder(toBuilder = true) + public TargetTable( + String name, + String formatName, + String basePath, + String[] namespace, + CatalogConfig catalogConfig, + Duration metadataRetention, + Properties additionalProperties) { + super(name, formatName, basePath, namespace, catalogConfig, additionalProperties); + this.metadataRetention = + metadataRetention == null ? Duration.of(7, ChronoUnit.DAYS) : metadataRetention; + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java index a0df756a7..736f49e42 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration; -import org.apache.xtable.conversion.PerTableConfig; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalPartitionField; @@ -89,5 +89,5 @@ public interface ConversionTarget { String getTableFormat(); /** Initializes the client with provided configuration */ - void init(PerTableConfig perTableConfig, Configuration configuration); + void init(TargetTable targetTable, Configuration configuration); } diff --git a/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java new file mode 100644 index 000000000..5422b0a7f --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestExternalTable.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import org.junit.jupiter.api.Test; + +public class TestExternalTable { + @Test + void sanitizePath() { + ExternalTable tooManySlashes = + new ExternalTable("name", "hudi", "s3://bucket//path", null, null, null); + assertEquals("s3://bucket/path", tooManySlashes.getBasePath()); + + ExternalTable localFilePath = + new ExternalTable("name", "hudi", "/local/data//path", null, null, null); + assertEquals("file:///local/data/path", localFilePath.getBasePath()); + + ExternalTable properLocalFilePath = + new ExternalTable("name", "hudi", "file:///local/data//path", null, null, null); + assertEquals("file:///local/data/path", properLocalFilePath.getBasePath()); + } + + @Test + void errorIfRequiredArgsNotSet() { + assertThrows( + NullPointerException.class, + () -> new ExternalTable("name", "hudi", null, null, null, null)); + + assertThrows( + NullPointerException.class, + () -> new ExternalTable("name", null, "file://bucket/path", null, null, null)); + + assertThrows( + NullPointerException.class, + () -> new ExternalTable(null, "hudi", "file://bucket/path", null, null, null)); + } +} diff --git a/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java new file mode 100644 index 000000000..05effaf93 --- /dev/null +++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestSourceTable.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.junit.jupiter.api.Test; + +class TestSourceTable { + @Test + void dataPathDefaultsToMetadataPath() { + String basePath = "file:///path/to/table"; + SourceTable sourceTable = + SourceTable.builder().name("name").formatName("hudi").basePath(basePath).build(); + assertEquals(basePath, sourceTable.getDataPath()); + } + + @Test + void dataPathIsSanitized() { + String basePath = "file:///path/to/table"; + String dataPath = "file:///path/to/table//data"; + SourceTable sourceTable = + SourceTable.builder() + .name("name") + .formatName("hudi") + .basePath(basePath) + .dataPath(dataPath) + .build(); + assertEquals("file:///path/to/table/data", sourceTable.getDataPath()); + } +} diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java similarity index 65% rename from xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java rename to xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java index 612fc9b38..9faa22c86 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/PerTableConfig.java +++ b/xtable-api/src/test/java/org/apache/xtable/conversion/TestTargetTable.java @@ -18,26 +18,18 @@ package org.apache.xtable.conversion; -import java.util.List; +import static org.junit.jupiter.api.Assertions.assertEquals; -import org.apache.xtable.model.sync.SyncMode; +import java.time.Duration; -public interface PerTableConfig { - int getTargetMetadataRetentionInHours(); +import org.junit.jupiter.api.Test; - String getTableBasePath(); - - String getTableDataPath(); - - String getTableName(); - - HudiSourceConfig getHudiSourceConfig(); - - List getTargetTableFormats(); - - SyncMode getSyncMode(); - - String[] getNamespace(); - - CatalogConfig getIcebergCatalogConfig(); +class TestTargetTable { + @Test + void retentionDefaultsToSevenDays() { + String basePath = "file:///path/to/table"; + TargetTable targetTable = + TargetTable.builder().name("name").formatName("hudi").basePath(basePath).build(); + assertEquals(Duration.ofDays(7), targetTable.getMetadataRetention()); + } } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java index 349687335..dc6659696 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -52,7 +51,7 @@ /** * Responsible for completing the entire lifecycle of the sync process given {@link - * PerTableConfigImpl}. This is done in three steps, + * ConversionConfig}. This is done in three steps, * *
    *
  • 1. Extracting snapshot {@link InternalSnapshot} from the source table format. @@ -72,33 +71,31 @@ public ConversionController(Configuration conf) { } /** - * Runs a sync for the given source table configuration in PerTableConfig. + * Runs a sync for the given source table configuration in ConversionConfig. * * @param config A per table level config containing tableBasePath, partitionFieldSpecConfig, * targetTableFormats and syncMode * @param conversionSourceProvider A provider for the {@link ConversionSource} instance, {@link - * ConversionSourceProvider#init(Configuration, Map)} must be called before calling this - * method. + * ConversionSourceProvider#init(Configuration)} must be called before calling this method. * @return Returns a map containing the table format, and it's sync result. Run sync for a table * with the provided per table level configuration. */ public Map sync( - PerTableConfig config, ConversionSourceProvider conversionSourceProvider) { - if (config.getTargetTableFormats() == null || config.getTargetTableFormats().isEmpty()) { + ConversionConfig config, ConversionSourceProvider conversionSourceProvider) { + if (config.getTargetTables() == null || config.getTargetTables().isEmpty()) { throw new IllegalArgumentException("Please provide at-least one format to sync"); } try (ConversionSource conversionSource = - conversionSourceProvider.getConversionSourceInstance(config)) { + conversionSourceProvider.getConversionSourceInstance(config.getSourceTable())) { ExtractFromSource source = ExtractFromSource.of(conversionSource); Map conversionTargetByFormat = - config.getTargetTableFormats().stream() + config.getTargetTables().stream() .collect( Collectors.toMap( - Function.identity(), - tableFormat -> - conversionTargetFactory.createForFormat(tableFormat, config, conf))); + TargetTable::getFormatName, + targetTable -> conversionTargetFactory.createForFormat(targetTable, conf))); // State for each TableFormat Map> lastSyncMetadataByFormat = conversionTargetByFormat.entrySet().stream() @@ -152,11 +149,11 @@ private static String getFormatsWithStatusCode( } private Map getFormatsToSyncIncrementally( - PerTableConfig perTableConfig, + ConversionConfig conversionConfig, Map conversionTargetByFormat, Map> lastSyncMetadataByFormat, ConversionSource conversionSource) { - if (perTableConfig.getSyncMode() == SyncMode.FULL) { + if (conversionConfig.getSyncMode() == SyncMode.FULL) { // Full sync requested by config, hence no incremental sync. return Collections.emptyMap(); } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java index 09cad6dbe..ccd6fde72 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionSourceProvider.java @@ -18,8 +18,6 @@ package org.apache.xtable.conversion; -import java.util.Map; - import org.apache.hadoop.conf.Configuration; import org.apache.xtable.spi.extractor.ConversionSource; @@ -33,16 +31,9 @@ public abstract class ConversionSourceProvider { /** The Hadoop configuration to use when reading from the source table. */ protected Configuration hadoopConf; - /** The configuration for the source. */ - protected Map sourceConf; - - /** The configuration for the table to read from. */ - protected PerTableConfig sourceTableConfig; - /** Initializes the provider various source specific configurations. */ - public void init(Configuration hadoopConf, Map sourceConf) { + public void init(Configuration hadoopConf) { this.hadoopConf = hadoopConf; - this.sourceConf = sourceConf; } /** @@ -54,5 +45,5 @@ public void init(Configuration hadoopConf, Map sourceConf) { * @return the conversion source */ public abstract ConversionSource getConversionSourceInstance( - PerTableConfig sourceTableConfig); + SourceTable sourceTableConfig); } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java index 68e85b4dc..8e3558976 100644 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java +++ b/xtable-core/src/main/java/org/apache/xtable/conversion/ConversionTargetFactory.java @@ -38,19 +38,17 @@ public static ConversionTargetFactory getInstance() { /** * Create a fully initialized instance of the ConversionTarget represented by the given Table - * Format name. Initialization is done with the config provided through PerTableConfig and + * Format name. Initialization is done with the config provided through TargetTable and * Configuration params. * - * @param tableFormat - * @param perTableConfig - * @param configuration - * @return + * @param targetTable the spec of the target + * @param configuration hadoop configuration + * @return an intialized {@link ConversionTarget} */ - public ConversionTarget createForFormat( - String tableFormat, PerTableConfig perTableConfig, Configuration configuration) { - ConversionTarget conversionTarget = createConversionTargetForName(tableFormat); + public ConversionTarget createForFormat(TargetTable targetTable, Configuration configuration) { + ConversionTarget conversionTarget = createConversionTargetForName(targetTable.getFormatName()); - conversionTarget.init(perTableConfig, configuration); + conversionTarget.init(targetTable, configuration); return conversionTarget; } diff --git a/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java b/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java deleted file mode 100644 index f34402b16..000000000 --- a/xtable-core/src/main/java/org/apache/xtable/conversion/PerTableConfigImpl.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.conversion; - -import java.util.List; - -import javax.annotation.Nonnull; - -import lombok.Builder; -import lombok.NonNull; -import lombok.Value; - -import org.apache.hadoop.fs.Path; - -import com.google.common.base.Preconditions; - -import org.apache.xtable.hudi.HudiSourceConfigImpl; -import org.apache.xtable.iceberg.IcebergCatalogConfig; -import org.apache.xtable.model.sync.SyncMode; - -/** Represents input configuration to the sync process. */ -@Value -public class PerTableConfigImpl implements PerTableConfig { - /** table base path in local file system or HDFS or object stores like S3, GCS etc. */ - @Nonnull String tableBasePath; - /** the base path for the data folder, defaults to the tableBasePath if not specified */ - @Nonnull String tableDataPath; - - /** The name of the table */ - @Nonnull String tableName; - - /** The namespace of the table (optional) */ - String[] namespace; - - /** - * HudiSourceConfig is a config that allows us to infer partition values for hoodie source tables. - * If the table is not partitioned, leave it blank. If it is partitioned, you can specify a spec - * with a comma separated list with format path:type:format. - * - *

    - *

  • partitionSpecExtractorClass: class to extract partition fields from the given - * spec.ConfigurationBasedPartitionSpecExtractor is the default class - *
  • partitionFieldSpecConfig: path:type:format spec to infer partition values - * - *
      - *
    • path: is a dot separated path to the partition field - *
    • type: describes how the partition value was generated from the column value - *
        - *
      • VALUE: an identity transform of field value to partition value - *
      • YEAR: data is partitioned by a field representing a date and year granularity - * is used - *
      • MONTH: same as YEAR but with month granularity - *
      • DAY: same as YEAR but with day granularity - *
      • HOUR: same as YEAR but with hour granularity - *
      - *
    • format: if your partition type is YEAR, MONTH, DAY, or HOUR specify the format for - * the date string as it appears in your file paths - *
    - */ - @Nonnull HudiSourceConfigImpl hudiSourceConfig; - - /** List of table formats to sync. */ - @Nonnull List targetTableFormats; - - /** Configuration options for integrating with an existing Iceberg Catalog (optional) */ - IcebergCatalogConfig icebergCatalogConfig; - - /** - * Mode of a sync. FULL is only supported right now. - * - *
      - *
    • FULL: Full sync will create a checkpoint of ALL the files relevant at a certain point in - * time - *
    • INCREMENTAL: Incremental will sync differential structures to bring the table state from - * and to points in the timeline - *
    - */ - @Nonnull SyncMode syncMode; - - /** - * The retention for metadata or versions of the table in the target systems to bound the size of - * any metadata tracked in the target system. Specified in hours. - */ - int targetMetadataRetentionInHours; - - @Builder - PerTableConfigImpl( - @NonNull String tableBasePath, - String tableDataPath, - @NonNull String tableName, - String[] namespace, - HudiSourceConfigImpl hudiSourceConfig, - @NonNull List targetTableFormats, - IcebergCatalogConfig icebergCatalogConfig, - SyncMode syncMode, - Integer targetMetadataRetentionInHours) { - // sanitize source path - this.tableBasePath = sanitizeBasePath(tableBasePath); - this.tableDataPath = tableDataPath == null ? tableBasePath : sanitizeBasePath(tableDataPath); - this.tableName = tableName; - this.namespace = namespace; - this.hudiSourceConfig = - hudiSourceConfig == null ? HudiSourceConfigImpl.builder().build() : hudiSourceConfig; - Preconditions.checkArgument( - targetTableFormats.size() > 0, "Please provide at-least one format to sync"); - this.targetTableFormats = targetTableFormats; - this.icebergCatalogConfig = icebergCatalogConfig; - this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; - this.targetMetadataRetentionInHours = - targetMetadataRetentionInHours == null ? 24 * 7 : targetMetadataRetentionInHours; - } - - private String sanitizeBasePath(String tableBasePath) { - Path path = new Path(tableBasePath); - Preconditions.checkArgument(path.isAbsolute(), "Table base path must be absolute"); - if (path.isAbsoluteAndSchemeAuthorityNull()) { - // assume this is local file system and append scheme - return "file://" + path; - } else if (path.toUri().getScheme().equals("file")) { - // add extra slashes - return "file://" + path.toUri().getPath(); - } else { - return path.toString(); - } - } -} diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java index f42425db8..045e2b724 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java @@ -23,18 +23,18 @@ import io.delta.tables.DeltaTable; import org.apache.xtable.conversion.ConversionSourceProvider; -import org.apache.xtable.conversion.PerTableConfig; +import org.apache.xtable.conversion.SourceTable; /** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */ public class DeltaConversionSourceProvider extends ConversionSourceProvider { @Override - public DeltaConversionSource getConversionSourceInstance(PerTableConfig perTableConfig) { + public DeltaConversionSource getConversionSourceInstance(SourceTable sourceTable) { SparkSession sparkSession = DeltaConversionUtils.buildSparkSession(hadoopConf); - DeltaTable deltaTable = DeltaTable.forPath(sparkSession, perTableConfig.getTableBasePath()); + DeltaTable deltaTable = DeltaTable.forPath(sparkSession, sourceTable.getBasePath()); return DeltaConversionSource.builder() .sparkSession(sparkSession) - .tableName(perTableConfig.getTableName()) - .basePath(perTableConfig.getTableBasePath()) + .tableName(sourceTable.getName()) + .basePath(sourceTable.getBasePath()) .deltaTable(deltaTable) .deltaLog(deltaTable.deltaLog()) .build(); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java index 120ee0e05..b34fa4491 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java @@ -54,7 +54,7 @@ import com.google.common.annotations.VisibleForTesting; -import org.apache.xtable.conversion.PerTableConfig; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; @@ -76,16 +76,16 @@ public class DeltaConversionTarget implements ConversionTarget { private DeltaDataFileUpdatesExtractor dataFileUpdatesExtractor; private String tableName; - private int logRetentionInHours; + private long logRetentionInHours; private TransactionState transactionState; public DeltaConversionTarget() {} - public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession sparkSession) { + public DeltaConversionTarget(TargetTable targetTable, SparkSession sparkSession) { this( - perTableConfig.getTableDataPath(), - perTableConfig.getTableName(), - perTableConfig.getTargetMetadataRetentionInHours(), + targetTable.getBasePath(), + targetTable.getName(), + targetTable.getMetadataRetention().toHours(), sparkSession, DeltaSchemaExtractor.getInstance(), DeltaPartitionExtractor.getInstance(), @@ -96,7 +96,7 @@ public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession sparkSe DeltaConversionTarget( String tableDataPath, String tableName, - int logRetentionInHours, + long logRetentionInHours, SparkSession sparkSession, DeltaSchemaExtractor schemaExtractor, DeltaPartitionExtractor partitionExtractor, @@ -115,7 +115,7 @@ public DeltaConversionTarget(PerTableConfig perTableConfig, SparkSession sparkSe private void _init( String tableDataPath, String tableName, - int logRetentionInHours, + long logRetentionInHours, SparkSession sparkSession, DeltaSchemaExtractor schemaExtractor, DeltaPartitionExtractor partitionExtractor, @@ -134,13 +134,13 @@ private void _init( } @Override - public void init(PerTableConfig perTableConfig, Configuration configuration) { + public void init(TargetTable targetTable, Configuration configuration) { SparkSession sparkSession = DeltaConversionUtils.buildSparkSession(configuration); _init( - perTableConfig.getTableDataPath(), - perTableConfig.getTableName(), - perTableConfig.getTargetMetadataRetentionInHours(), + targetTable.getBasePath(), + targetTable.getName(), + targetTable.getMetadataRetention().toHours(), sparkSession, DeltaSchemaExtractor.getInstance(), DeltaPartitionExtractor.getInstance(), @@ -221,7 +221,7 @@ private class TransactionState { private final OptimisticTransaction transaction; private final Instant commitTime; private final DeltaLog deltaLog; - private final int retentionInHours; + private final long retentionInHours; @Getter private final List partitionColumns; private final String tableName; @Getter private StructType latestSchema; @@ -230,7 +230,7 @@ private class TransactionState { @Setter private Seq actions; private TransactionState( - DeltaLog deltaLog, String tableName, Instant latestCommitTime, int retentionInHours) { + DeltaLog deltaLog, String tableName, Instant latestCommitTime, long retentionInHours) { this.deltaLog = deltaLog; this.transaction = deltaLog.startTransaction(); this.latestSchema = deltaLog.snapshot().schema(); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java index e3659f19d..7bc41a109 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/ConfigurationBasedPartitionSpecExtractor.java @@ -36,13 +36,13 @@ */ @AllArgsConstructor public class ConfigurationBasedPartitionSpecExtractor implements HudiSourcePartitionSpecExtractor { - private final HudiSourceConfigImpl config; + private final HudiSourceConfig config; @Override public List spec(InternalSchema tableSchema) { List partitionFields = new ArrayList<>(config.getPartitionFieldSpecs().size()); - for (HudiSourceConfigImpl.PartitionFieldSpec fieldSpec : config.getPartitionFieldSpecs()) { + for (HudiSourceConfig.PartitionFieldSpec fieldSpec : config.getPartitionFieldSpecs()) { InternalField sourceField = SchemaFieldFinder.getInstance() .findFieldByPath(tableSchema, fieldSpec.getSourceFieldPath()); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java index f6e5e28d1..0ddbbcb76 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java @@ -25,19 +25,18 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.xtable.conversion.ConversionSourceProvider; -import org.apache.xtable.conversion.PerTableConfig; +import org.apache.xtable.conversion.SourceTable; /** A concrete implementation of {@link ConversionSourceProvider} for Hudi table format. */ @Log4j2 public class HudiConversionSourceProvider extends ConversionSourceProvider { @Override - public HudiConversionSource getConversionSourceInstance(PerTableConfig sourceTableConfig) { - this.sourceTableConfig = sourceTableConfig; + public HudiConversionSource getConversionSourceInstance(SourceTable sourceTable) { HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() .setConf(hadoopConf) - .setBasePath(this.sourceTableConfig.getTableBasePath()) + .setBasePath(sourceTable.getBasePath()) .setLoadActiveTimelineOnLoad(true) .build(); if (!metaClient.getTableConfig().getTableType().equals(HoodieTableType.COPY_ON_WRITE)) { @@ -45,8 +44,8 @@ public HudiConversionSource getConversionSourceInstance(PerTableConfig sourceTab } final HudiSourcePartitionSpecExtractor sourcePartitionSpecExtractor = - (HudiSourcePartitionSpecExtractor) - sourceTableConfig.getHudiSourceConfig().loadSourcePartitionSpecExtractor(); + HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties()) + .loadSourcePartitionSpecExtractor(); return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java index b1a0bc911..c3ef6f922 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java @@ -78,7 +78,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.xtable.avro.AvroSchemaConverter; -import org.apache.xtable.conversion.PerTableConfig; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.exception.ReadException; import org.apache.xtable.exception.UpdateException; @@ -108,16 +108,15 @@ public HudiConversionTarget() {} @VisibleForTesting HudiConversionTarget( - PerTableConfig perTableConfig, + TargetTable targetTable, Configuration configuration, int maxNumDeltaCommitsBeforeCompaction) { this( - perTableConfig.getTableDataPath(), - perTableConfig.getTargetMetadataRetentionInHours(), + targetTable.getBasePath(), + (int) targetTable.getMetadataRetention().toHours(), maxNumDeltaCommitsBeforeCompaction, BaseFileUpdatesExtractor.of( - new HoodieJavaEngineContext(configuration), - new CachingPath(perTableConfig.getTableDataPath())), + new HoodieJavaEngineContext(configuration), new CachingPath(targetTable.getBasePath())), AvroSchemaConverter.getInstance(), HudiTableManager.of(configuration), CommitState::new); @@ -163,14 +162,13 @@ private void _init( } @Override - public void init(PerTableConfig perTableConfig, Configuration configuration) { + public void init(TargetTable targetTable, Configuration configuration) { _init( - perTableConfig.getTableDataPath(), - perTableConfig.getTargetMetadataRetentionInHours(), + targetTable.getBasePath(), + (int) targetTable.getMetadataRetention().toHours(), HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.defaultValue(), BaseFileUpdatesExtractor.of( - new HoodieJavaEngineContext(configuration), - new CachingPath(perTableConfig.getTableDataPath())), + new HoodieJavaEngineContext(configuration), new CachingPath(targetTable.getBasePath())), AvroSchemaConverter.getInstance(), HudiTableManager.of(configuration), CommitState::new); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java similarity index 68% rename from xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java rename to xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java index 0fa8b5294..606b92811 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfigImpl.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java @@ -22,29 +22,41 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Properties; -import lombok.Builder; import lombok.Value; import com.google.common.base.Preconditions; -import org.apache.xtable.conversion.HudiSourceConfig; import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.reflection.ReflectionUtils; /** Configuration of Hudi source format for the sync process. */ @Value -public class HudiSourceConfigImpl implements HudiSourceConfig { +public class HudiSourceConfig { + public static final String PARTITION_SPEC_EXTRACTOR_CLASS = + "xtable.hudi.source.partition_spec_extractor_class"; + public static final String PARTITION_FIELD_SPEC_CONFIG = + "xtable.hudi.source.partition_field_spec_config"; + String partitionSpecExtractorClass; List partitionFieldSpecs; - @Builder - public HudiSourceConfigImpl(String partitionSpecExtractorClass, String partitionFieldSpecConfig) { - this.partitionSpecExtractorClass = - partitionSpecExtractorClass == null - ? ConfigurationBasedPartitionSpecExtractor.class.getName() - : partitionSpecExtractorClass; - this.partitionFieldSpecs = parsePartitionFieldSpecs(partitionFieldSpecConfig); + public static HudiSourceConfig fromPartitionFieldSpecConfig(String partitionFieldSpecConfig) { + return new HudiSourceConfig( + ConfigurationBasedPartitionSpecExtractor.class.getName(), + parsePartitionFieldSpecs(partitionFieldSpecConfig)); + } + + public static HudiSourceConfig fromProperties(Properties properties) { + String partitionSpecExtractorClass = + properties.getProperty( + PARTITION_SPEC_EXTRACTOR_CLASS, + ConfigurationBasedPartitionSpecExtractor.class.getName()); + String partitionFieldSpecString = properties.getProperty(PARTITION_FIELD_SPEC_CONFIG); + List partitionFieldSpecs = + parsePartitionFieldSpecs(partitionFieldSpecString); + return new HudiSourceConfig(partitionSpecExtractorClass, partitionFieldSpecs); } @Value diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 7b26ee865..f96ec7149 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -20,23 +20,35 @@ import java.io.IOException; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import lombok.*; +import lombok.AccessLevel; +import lombok.Builder; +import lombok.Getter; +import lombok.NonNull; import lombok.extern.log4j.Log4j2; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.*; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; -import org.apache.xtable.conversion.PerTableConfig; +import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.exception.ReadException; import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; @@ -46,15 +58,18 @@ import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.stat.PartitionValue; -import org.apache.xtable.model.storage.*; +import org.apache.xtable.model.storage.DataFilesDiff; +import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.PartitionFileGroup; +import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.extractor.ConversionSource; @Log4j2 @Builder public class IcebergConversionSource implements ConversionSource { @NonNull private final Configuration hadoopConf; - @NonNull private final PerTableConfig sourceTableConfig; + @NonNull private final SourceTable sourceTableConfig; @Getter(lazy = true, value = AccessLevel.PACKAGE) private final Table sourceTable = initSourceTable(); @@ -73,15 +88,15 @@ public class IcebergConversionSource implements ConversionSource { private Table initSourceTable() { IcebergTableManager tableManager = IcebergTableManager.of(hadoopConf); String[] namespace = sourceTableConfig.getNamespace(); - String tableName = sourceTableConfig.getTableName(); + String tableName = sourceTableConfig.getName(); TableIdentifier tableIdentifier = namespace == null ? TableIdentifier.of(tableName) : TableIdentifier.of(Namespace.of(namespace), tableName); return tableManager.getTable( - (IcebergCatalogConfig) sourceTableConfig.getIcebergCatalogConfig(), + (IcebergCatalogConfig) sourceTableConfig.getCatalogConfig(), tableIdentifier, - sourceTableConfig.getTableBasePath()); + sourceTableConfig.getBasePath()); } private FileIO initTableOps() { diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java index 9a2bb8403..449ebe5d3 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java @@ -21,12 +21,12 @@ import org.apache.iceberg.Snapshot; import org.apache.xtable.conversion.ConversionSourceProvider; -import org.apache.xtable.conversion.PerTableConfig; +import org.apache.xtable.conversion.SourceTable; /** A concrete implementation of {@link ConversionSourceProvider} for Hudi table format. */ public class IcebergConversionSourceProvider extends ConversionSourceProvider { @Override - public IcebergConversionSource getConversionSourceInstance(PerTableConfig sourceTableConfig) { + public IcebergConversionSource getConversionSourceInstance(SourceTable sourceTableConfig) { return IcebergConversionSource.builder() .sourceTableConfig(sourceTableConfig) .hadoopConf(hadoopConf) diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index 72125cb01..ecdbfa261 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -39,7 +39,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NotFoundException; -import org.apache.xtable.conversion.PerTableConfig; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalPartitionField; @@ -70,7 +70,7 @@ public class IcebergConversionTarget implements ConversionTarget { public IcebergConversionTarget() {} IcebergConversionTarget( - PerTableConfig perTableConfig, + TargetTable targetTable, Configuration configuration, IcebergSchemaExtractor schemaExtractor, IcebergSchemaSync schemaSync, @@ -79,7 +79,7 @@ public IcebergConversionTarget() {} IcebergDataFileUpdatesSync dataFileUpdatesExtractor, IcebergTableManager tableManager) { _init( - perTableConfig, + targetTable, configuration, schemaExtractor, schemaSync, @@ -90,7 +90,7 @@ public IcebergConversionTarget() {} } private void _init( - PerTableConfig perTableConfig, + TargetTable targetTable, Configuration configuration, IcebergSchemaExtractor schemaExtractor, IcebergSchemaSync schemaSync, @@ -103,17 +103,17 @@ private void _init( this.partitionSpecExtractor = partitionSpecExtractor; this.partitionSpecSync = partitionSpecSync; this.dataFileUpdatesExtractor = dataFileUpdatesExtractor; - String tableName = perTableConfig.getTableName(); - this.basePath = perTableConfig.getTableBasePath(); + String tableName = targetTable.getName(); + this.basePath = targetTable.getBasePath(); this.configuration = configuration; - this.snapshotRetentionInHours = perTableConfig.getTargetMetadataRetentionInHours(); - String[] namespace = perTableConfig.getNamespace(); + this.snapshotRetentionInHours = (int) targetTable.getMetadataRetention().toHours(); + String[] namespace = targetTable.getNamespace(); this.tableIdentifier = namespace == null ? TableIdentifier.of(tableName) : TableIdentifier.of(Namespace.of(namespace), tableName); this.tableManager = tableManager; - this.catalogConfig = (IcebergCatalogConfig) perTableConfig.getIcebergCatalogConfig(); + this.catalogConfig = (IcebergCatalogConfig) targetTable.getCatalogConfig(); if (tableManager.tableExists(catalogConfig, tableIdentifier, basePath)) { // Load the table state if it already exists @@ -124,9 +124,9 @@ private void _init( } @Override - public void init(PerTableConfig perTableConfig, Configuration configuration) { + public void init(TargetTable targetTable, Configuration configuration) { _init( - perTableConfig, + targetTable, configuration, IcebergSchemaExtractor.getInstance(), IcebergSchemaSync.getInstance(), diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 443384949..58f0f9828 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -19,6 +19,7 @@ package org.apache.xtable; import static org.apache.xtable.GenericTable.getTableName; +import static org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG; import static org.apache.xtable.hudi.HudiTestUtil.PartitionConfig; import static org.apache.xtable.model.storage.TableFormat.DELTA; import static org.apache.xtable.model.storage.TableFormat.HUDI; @@ -30,6 +31,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; @@ -41,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -81,13 +84,13 @@ import com.google.common.collect.ImmutableList; +import org.apache.xtable.conversion.ConversionConfig; import org.apache.xtable.conversion.ConversionController; import org.apache.xtable.conversion.ConversionSourceProvider; -import org.apache.xtable.conversion.PerTableConfig; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.delta.DeltaConversionSourceProvider; import org.apache.xtable.hudi.HudiConversionSourceProvider; -import org.apache.xtable.hudi.HudiSourceConfigImpl; import org.apache.xtable.hudi.HudiTestUtil; import org.apache.xtable.iceberg.IcebergConversionSourceProvider; import org.apache.xtable.model.storage.TableFormat; @@ -147,17 +150,17 @@ private ConversionSourceProvider getConversionSourceProvider(String sourceTab if (sourceTableFormat.equalsIgnoreCase(HUDI)) { ConversionSourceProvider hudiConversionSourceProvider = new HudiConversionSourceProvider(); - hudiConversionSourceProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap()); + hudiConversionSourceProvider.init(jsc.hadoopConfiguration()); return hudiConversionSourceProvider; } else if (sourceTableFormat.equalsIgnoreCase(DELTA)) { ConversionSourceProvider deltaConversionSourceProvider = new DeltaConversionSourceProvider(); - deltaConversionSourceProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap()); + deltaConversionSourceProvider.init(jsc.hadoopConfiguration()); return deltaConversionSourceProvider; } else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) { ConversionSourceProvider icebergConversionSourceProvider = new IcebergConversionSourceProvider(); - icebergConversionSourceProvider.init(jsc.hadoopConfiguration(), Collections.emptyMap()); + icebergConversionSourceProvider.init(jsc.hadoopConfiguration()); return icebergConversionSourceProvider; } else { throw new IllegalArgumentException("Unsupported source format: " + sourceTableFormat); @@ -193,27 +196,26 @@ public void testVariousOperations( tableName, tempDir, sparkSession, jsc, sourceTableFormat, isPartitioned)) { insertRecords = table.insertRows(100); - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(targetTableFormats) - .tableBasePath(table.getBasePath()) - .tableDataPath(table.getDataPath()) - .hudiSourceConfig( - HudiSourceConfigImpl.builder().partitionFieldSpecConfig(partitionConfig).build()) - .syncMode(syncMode) - .build(); - conversionController.sync(perTableConfig, conversionSourceProvider); + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + syncMode, + tableName, + table, + targetTableFormats, + partitionConfig, + null); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 100); // make multiple commits and then sync table.insertRows(100); table.upsertRows(insertRecords.subList(0, 20)); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 200); table.deleteRows(insertRecords.subList(30, 50)); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 180); checkDatasetEquivalenceWithFilter( sourceTableFormat, table, targetTableFormats, table.getFilterQuery()); @@ -222,39 +224,38 @@ public void testVariousOperations( try (GenericTable tableWithUpdatedSchema = GenericTable.getInstanceWithAdditionalColumns( tableName, tempDir, sparkSession, jsc, sourceTableFormat, isPartitioned)) { - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(targetTableFormats) - .tableBasePath(tableWithUpdatedSchema.getBasePath()) - .tableDataPath(tableWithUpdatedSchema.getDataPath()) - .hudiSourceConfig( - HudiSourceConfigImpl.builder().partitionFieldSpecConfig(partitionConfig).build()) - .syncMode(syncMode) - .build(); + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + syncMode, + tableName, + tableWithUpdatedSchema, + targetTableFormats, + partitionConfig, + null); List insertsAfterSchemaUpdate = tableWithUpdatedSchema.insertRows(100); tableWithUpdatedSchema.reload(); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, targetTableFormats, 280); tableWithUpdatedSchema.deleteRows(insertsAfterSchemaUpdate.subList(60, 90)); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, targetTableFormats, 250); if (isPartitioned) { // Adds new partition. tableWithUpdatedSchema.insertRecordsForSpecialPartition(50); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, targetTableFormats, 300); // Drops partition. tableWithUpdatedSchema.deleteSpecialPartition(); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, targetTableFormats, 250); // Insert records to the dropped partition again. tableWithUpdatedSchema.insertRecordsForSpecialPartition(50); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(sourceTableFormat, tableWithUpdatedSchema, targetTableFormats, 300); } } @@ -279,24 +280,22 @@ public void testConcurrentInsertWritesInSource( String commitInstant2 = table.startCommit(); table.insertRecordsWithCommitAlreadyStarted(insertsForCommit2, commitInstant2, true); - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(targetTableFormats) - .tableBasePath(table.getBasePath()) - .hudiSourceConfig( - HudiSourceConfigImpl.builder() - .partitionFieldSpecConfig(partitionConfig.getXTableConfig()) - .build()) - .syncMode(syncMode) - .build(); + ConversionConfig conversionConfig = + getTableSyncConfig( + HUDI, + syncMode, + tableName, + table, + targetTableFormats, + partitionConfig.getXTableConfig(), + null); ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, targetTableFormats, 50); table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1, commitInstant1, true); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, targetTableFormats, 100); } } @@ -314,20 +313,18 @@ public void testConcurrentInsertsAndTableServiceWrites( tableName, tempDir, jsc, partitionConfig.getHudiConfig(), tableType)) { List> insertedRecords1 = table.insertRecords(50, true); - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(targetTableFormats) - .tableBasePath(table.getBasePath()) - .hudiSourceConfig( - HudiSourceConfigImpl.builder() - .partitionFieldSpecConfig(partitionConfig.getXTableConfig()) - .build()) - .syncMode(syncMode) - .build(); + ConversionConfig conversionConfig = + getTableSyncConfig( + HUDI, + syncMode, + tableName, + table, + targetTableFormats, + partitionConfig.getXTableConfig(), + null); ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, targetTableFormats, 50); table.deleteRecords(insertedRecords1.subList(0, 20), true); @@ -335,7 +332,7 @@ public void testConcurrentInsertsAndTableServiceWrites( String scheduledCompactionInstant = table.onlyScheduleCompaction(); table.insertRecords(50, true); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); Map sourceHudiOptions = Collections.singletonMap("hoodie.datasource.query.type", "read_optimized"); // Because compaction is not completed yet and read optimized query, there are 100 records. @@ -343,13 +340,13 @@ public void testConcurrentInsertsAndTableServiceWrites( HUDI, table, sourceHudiOptions, targetTableFormats, Collections.emptyMap(), 100); table.insertRecords(50, true); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); // Because compaction is not completed yet and read optimized query, there are 150 records. checkDatasetEquivalence( HUDI, table, sourceHudiOptions, targetTableFormats, Collections.emptyMap(), 150); table.completeScheduledCompaction(scheduledCompactionInstant); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, targetTableFormats, 130); } } @@ -362,31 +359,32 @@ public void testTimeTravelQueries(String sourceTableFormat) throws Exception { GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, sourceTableFormat, false)) { table.insertRows(50); List targetTableFormats = getOtherFormats(sourceTableFormat); - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(targetTableFormats) - .tableBasePath(table.getBasePath()) - .tableDataPath(table.getDataPath()) - .syncMode(SyncMode.INCREMENTAL) - .build(); + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + SyncMode.INCREMENTAL, + tableName, + table, + targetTableFormats, + null, + null); ConversionSourceProvider conversionSourceProvider = getConversionSourceProvider(sourceTableFormat); ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); Instant instantAfterFirstSync = Instant.now(); // sleep before starting the next commit to avoid any rounding issues Thread.sleep(1000); table.insertRows(50); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); Instant instantAfterSecondSync = Instant.now(); // sleep before starting the next commit to avoid any rounding issues Thread.sleep(1000); table.insertRows(50); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence( sourceTableFormat, @@ -485,25 +483,22 @@ public void testPartitionedData(TableFormatPartitionDataHolder tableFormatPartit GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, sourceTableFormat, true); } try (GenericTable tableToClose = table) { - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(targetTableFormats) - .tableBasePath(tableToClose.getBasePath()) - .tableDataPath(tableToClose.getDataPath()) - .hudiSourceConfig( - HudiSourceConfigImpl.builder() - .partitionFieldSpecConfig(xTablePartitionConfig) - .build()) - .syncMode(SyncMode.INCREMENTAL) - .build(); + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + SyncMode.INCREMENTAL, + tableName, + table, + targetTableFormats, + xTablePartitionConfig, + null); tableToClose.insertRows(100); ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); // Do a second sync to force the test to read back the metadata it wrote earlier tableToClose.insertRows(100); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalenceWithFilter( sourceTableFormat, tableToClose, targetTableFormats, filter); @@ -520,33 +515,23 @@ public void testSyncWithSingleFormat(SyncMode syncMode) { tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { table.insertRecords(100, true); - PerTableConfig perTableConfigIceberg = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(ImmutableList.of(ICEBERG)) - .tableBasePath(table.getBasePath()) - .syncMode(syncMode) - .build(); - - PerTableConfig perTableConfigDelta = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(ImmutableList.of(DELTA)) - .tableBasePath(table.getBasePath()) - .syncMode(syncMode) - .build(); + ConversionConfig conversionConfigIceberg = + getTableSyncConfig( + HUDI, syncMode, tableName, table, ImmutableList.of(ICEBERG), null, null); + ConversionConfig conversionConfigDelta = + getTableSyncConfig(HUDI, syncMode, tableName, table, ImmutableList.of(DELTA), null, null); ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); - conversionController.sync(perTableConfigIceberg, conversionSourceProvider); + conversionController.sync(conversionConfigIceberg, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG), 100); - conversionController.sync(perTableConfigDelta, conversionSourceProvider); + conversionController.sync(conversionConfigDelta, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 100); table.insertRecords(100, true); - conversionController.sync(perTableConfigIceberg, conversionSourceProvider); + conversionController.sync(conversionConfigIceberg, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG), 200); - conversionController.sync(perTableConfigDelta, conversionSourceProvider); + conversionController.sync(conversionConfigDelta, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, Collections.singletonList(DELTA), 200); } } @@ -558,21 +543,18 @@ public void testOutOfSyncIncrementalSyncs() { try (TestJavaHudiTable table = TestJavaHudiTable.forStandardSchema( tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { - PerTableConfig singleTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(ImmutableList.of(ICEBERG)) - .tableBasePath(table.getBasePath()) - .syncMode(SyncMode.INCREMENTAL) - .build(); - - PerTableConfig dualTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(Arrays.asList(ICEBERG, DELTA)) - .tableBasePath(table.getBasePath()) - .syncMode(SyncMode.INCREMENTAL) - .build(); + ConversionConfig singleTableConfig = + getTableSyncConfig( + HUDI, SyncMode.INCREMENTAL, tableName, table, ImmutableList.of(ICEBERG), null, null); + ConversionConfig dualTableConfig = + getTableSyncConfig( + HUDI, + SyncMode.INCREMENTAL, + tableName, + table, + Arrays.asList(ICEBERG, DELTA), + null, + null); table.insertRecords(50, true); ConversionController conversionController = @@ -612,18 +594,20 @@ public void testIcebergCorruptedSnapshotRecovery() throws Exception { table.insertRows(20); ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(Collections.singletonList(ICEBERG)) - .tableBasePath(table.getBasePath()) - .syncMode(SyncMode.INCREMENTAL) - .build(); - conversionController.sync(perTableConfig, conversionSourceProvider); + ConversionConfig conversionConfig = + getTableSyncConfig( + HUDI, + SyncMode.INCREMENTAL, + tableName, + table, + Collections.singletonList(ICEBERG), + null, + null); + conversionController.sync(conversionConfig, conversionSourceProvider); table.insertRows(10); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); table.insertRows(10); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); // corrupt last two snapshots Table icebergTable = new HadoopTables(jsc.hadoopConfiguration()).load(table.getBasePath()); long currentSnapshotId = icebergTable.currentSnapshot().snapshotId(); @@ -633,7 +617,7 @@ public void testIcebergCorruptedSnapshotRecovery() throws Exception { Files.delete( Paths.get(URI.create(icebergTable.snapshot(previousSnapshotId).manifestListLocation()))); table.insertRows(10); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); checkDatasetEquivalence(HUDI, table, Collections.singletonList(ICEBERG), 50); } } @@ -645,18 +629,19 @@ public void testMetadataRetention() throws Exception { try (TestJavaHudiTable table = TestJavaHudiTable.forStandardSchema( tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) { - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(Arrays.asList(ICEBERG, DELTA)) - .tableBasePath(table.getBasePath()) - .syncMode(SyncMode.INCREMENTAL) - .targetMetadataRetentionInHours(0) // force cleanup - .build(); + ConversionConfig conversionConfig = + getTableSyncConfig( + HUDI, + SyncMode.INCREMENTAL, + tableName, + table, + Arrays.asList(ICEBERG, DELTA), + null, + Duration.ofHours(0)); // force cleanup ConversionController conversionController = new ConversionController(jsc.hadoopConfiguration()); table.insertRecords(10, true); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); // later we will ensure we can still read the source table at this instant to ensure that // neither target cleaned up the underlying parquet files in the table Instant instantAfterFirstCommit = Instant.now(); @@ -667,7 +652,7 @@ public void testMetadataRetention() throws Exception { .forEach( unused -> { table.insertRecords(10, true); - conversionController.sync(perTableConfig, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); }); // ensure that hudi rows can still be read and underlying files were not removed List rows = @@ -859,4 +844,45 @@ private static class TableFormatPartitionDataHolder { Optional hudiSourceConfig; String filter; } + + private static ConversionConfig getTableSyncConfig( + String sourceTableFormat, + SyncMode syncMode, + String tableName, + GenericTable table, + List targetTableFormats, + String partitionConfig, + Duration metadataRetention) { + Properties sourceProperties = new Properties(); + if (partitionConfig != null) { + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + } + SourceTable sourceTable = + SourceTable.builder() + .name(tableName) + .formatName(sourceTableFormat) + .basePath(table.getBasePath()) + .dataPath(table.getDataPath()) + .additionalProperties(sourceProperties) + .build(); + + List targetTables = + targetTableFormats.stream() + .map( + formatName -> + TargetTable.builder() + .name(tableName) + .formatName(formatName) + // set the metadata path to the data path as the default (required by Hudi) + .basePath(table.getDataPath()) + .metadataRetention(metadataRetention) + .build()) + .collect(Collectors.toList()); + + return ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(syncMode) + .build(); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java new file mode 100644 index 000000000..6e502af06 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionConfig.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.conversion; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +import org.apache.xtable.model.sync.SyncMode; + +class TestConversionConfig { + + @Test + void defaultValueSet() { + ConversionConfig conversionConfig = + ConversionConfig.builder() + .sourceTable(mock(SourceTable.class)) + .targetTables(Collections.singletonList(mock(TargetTable.class))) + .build(); + + assertEquals(SyncMode.INCREMENTAL, conversionConfig.getSyncMode()); + } + + @Test + void errorIfSourceTableNotSet() { + assertThrows( + NullPointerException.class, + () -> + ConversionConfig.builder() + .targetTables(Collections.singletonList(mock(TargetTable.class))) + .build()); + } + + @Test + void errorIfNoTargetsSet() { + Exception thrownException = + assertThrows( + IllegalArgumentException.class, + () -> + ConversionConfig.builder() + .sourceTable(mock(SourceTable.class)) + .targetTables(Collections.emptyList()) + .build()); + assertEquals("Please provide at-least one format to sync", thrownException.getMessage()); + } +} diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java index 558b8ee5d..652bbe426 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionController.java @@ -18,7 +18,7 @@ package org.apache.xtable.conversion; -import static org.apache.xtable.GenericTable.getTableName; +import static org.apache.xtable.model.storage.TableFormat.HUDI; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -79,13 +79,16 @@ void testAllSnapshotSyncAsPerConfig() { Map perTableResults = new HashMap<>(); perTableResults.put(TableFormat.ICEBERG, syncResult); perTableResults.put(TableFormat.DELTA, syncResult); - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); - when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig)) + ConversionConfig conversionConfig = + getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); + when(mockConversionSourceProvider.getConversionSourceInstance( + conversionConfig.getSourceTable())) .thenReturn(mockConversionSource); - when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(0), mockConf)) .thenReturn(mockConversionTarget1); - when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(1), mockConf)) .thenReturn(mockConversionTarget2); when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot); when(tableFormatSync.syncSnapshot( @@ -95,20 +98,23 @@ void testAllSnapshotSyncAsPerConfig() { ConversionController conversionController = new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); Map result = - conversionController.sync(perTableConfig, mockConversionSourceProvider); + conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(perTableResults, result); } @Test void testAllIncrementalSyncAsPerConfigAndNoFallbackNecessary() { SyncMode syncMode = SyncMode.INCREMENTAL; - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); - when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig)) + ConversionConfig conversionConfig = + getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); + when(mockConversionSourceProvider.getConversionSourceInstance( + conversionConfig.getSourceTable())) .thenReturn(mockConversionSource); - when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(0), mockConf)) .thenReturn(mockConversionTarget1); - when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(1), mockConf)) .thenReturn(mockConversionTarget2); Instant instantAsOfNow = Instant.now(); @@ -178,7 +184,7 @@ void testAllIncrementalSyncAsPerConfigAndNoFallbackNecessary() { ConversionController conversionController = new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); Map result = - conversionController.sync(perTableConfig, mockConversionSourceProvider); + conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); } @@ -192,13 +198,16 @@ void testIncrementalSyncFallBackToSnapshotForAllFormats() { Map syncResults = new HashMap<>(); syncResults.put(TableFormat.ICEBERG, syncResult); syncResults.put(TableFormat.DELTA, syncResult); - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); - when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig)) + ConversionConfig conversionConfig = + getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); + when(mockConversionSourceProvider.getConversionSourceInstance( + conversionConfig.getSourceTable())) .thenReturn(mockConversionSource); - when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(0), mockConf)) .thenReturn(mockConversionTarget1); - when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(1), mockConf)) .thenReturn(mockConversionTarget2); Instant instantAsOfNow = Instant.now(); @@ -219,20 +228,23 @@ void testIncrementalSyncFallBackToSnapshotForAllFormats() { ConversionController conversionController = new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); Map result = - conversionController.sync(perTableConfig, mockConversionSourceProvider); + conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(syncResults, result); } @Test void testIncrementalSyncFallbackToSnapshotForOnlySingleFormat() { SyncMode syncMode = SyncMode.INCREMENTAL; - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); - when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig)) + ConversionConfig conversionConfig = + getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); + when(mockConversionSourceProvider.getConversionSourceInstance( + conversionConfig.getSourceTable())) .thenReturn(mockConversionSource); - when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(0), mockConf)) .thenReturn(mockConversionTarget1); - when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(1), mockConf)) .thenReturn(mockConversionTarget2); Instant instantAsOfNow = Instant.now(); @@ -300,20 +312,23 @@ void testIncrementalSyncFallbackToSnapshotForOnlySingleFormat() { ConversionController conversionController = new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); Map result = - conversionController.sync(perTableConfig, mockConversionSourceProvider); + conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); } @Test void incrementalSyncWithNoPendingInstantsForAllFormats() { SyncMode syncMode = SyncMode.INCREMENTAL; - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); - when(mockConversionSourceProvider.getConversionSourceInstance(perTableConfig)) + ConversionConfig conversionConfig = + getTableSyncConfig(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA), syncMode); + when(mockConversionSourceProvider.getConversionSourceInstance( + conversionConfig.getSourceTable())) .thenReturn(mockConversionSource); - when(mockConversionTargetFactory.createForFormat(TableFormat.ICEBERG, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(0), mockConf)) .thenReturn(mockConversionTarget1); - when(mockConversionTargetFactory.createForFormat(TableFormat.DELTA, perTableConfig, mockConf)) + when(mockConversionTargetFactory.createForFormat( + conversionConfig.getTargetTables().get(1), mockConf)) .thenReturn(mockConversionTarget2); Instant instantAsOfNow = Instant.now(); @@ -355,7 +370,7 @@ void incrementalSyncWithNoPendingInstantsForAllFormats() { ConversionController conversionController = new ConversionController(mockConf, mockConversionTargetFactory, tableFormatSync); Map result = - conversionController.sync(perTableConfig, mockConversionSourceProvider); + conversionController.sync(conversionConfig, mockConversionSourceProvider); assertEquals(expectedSyncResult, result); } @@ -394,14 +409,31 @@ private InternalTable getInternalTable(Instant instant) { } private Instant getInstantAtLastNMinutes(Instant currentInstant, int n) { - return Instant.now().minus(Duration.ofMinutes(n)); + return currentInstant.minus(Duration.ofMinutes(n)); } - private PerTableConfig getPerTableConfig(List targetTableFormats, SyncMode syncMode) { - return PerTableConfigImpl.builder() - .tableName(getTableName()) - .tableBasePath("/tmp/doesnt/matter") - .targetTableFormats(targetTableFormats) + private ConversionConfig getTableSyncConfig(List targetTableFormats, SyncMode syncMode) { + SourceTable sourceTable = + SourceTable.builder() + .name("tablename") + .formatName(HUDI) + .basePath("/tmp/doesnt/matter") + .build(); + + List targetTables = + targetTableFormats.stream() + .map( + formatName -> + TargetTable.builder() + .name("tablename") + .formatName(formatName) + .basePath("/tmp/doesnt/matter") + .build()) + .collect(Collectors.toList()); + + return ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) .syncMode(syncMode) .build(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java index 34d3cbef1..0984b42be 100644 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java +++ b/xtable-core/src/test/java/org/apache/xtable/conversion/TestConversionTargetFactory.java @@ -24,15 +24,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.Arrays; -import java.util.List; - import org.apache.hadoop.conf.Configuration; import org.junit.jupiter.api.Test; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.model.storage.TableFormat; -import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.spi.sync.ConversionTarget; public class TestConversionTargetFactory { @@ -42,11 +38,10 @@ public void testConversionTargetFromNameForDELTA() { ConversionTarget tc = ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.DELTA); assertNotNull(tc); - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.DELTA), SyncMode.INCREMENTAL); + TargetTable targetTable = getPerTableConfig(TableFormat.DELTA); Configuration conf = new Configuration(); conf.set("spark.master", "local"); - tc.init(perTableConfig, conf); + tc.init(targetTable, conf); assertEquals(tc.getTableFormat(), TableFormat.DELTA); } @@ -55,11 +50,10 @@ public void testConversionTargetFromNameForHUDI() { ConversionTarget tc = ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.HUDI); assertNotNull(tc); - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.HUDI), SyncMode.INCREMENTAL); + TargetTable targetTable = getPerTableConfig(TableFormat.HUDI); Configuration conf = new Configuration(); conf.setStrings("spark.master", "local"); - tc.init(perTableConfig, conf); + tc.init(targetTable, conf); assertEquals(tc.getTableFormat(), TableFormat.HUDI); } @@ -68,11 +62,10 @@ public void testConversionTargetFromNameForICEBERG() { ConversionTarget tc = ConversionTargetFactory.getInstance().createConversionTargetForName(TableFormat.ICEBERG); assertNotNull(tc); - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.ICEBERG), SyncMode.INCREMENTAL); + TargetTable targetTable = getPerTableConfig(TableFormat.ICEBERG); Configuration conf = new Configuration(); conf.setStrings("spark.master", "local"); - tc.init(perTableConfig, conf); + tc.init(targetTable, conf); assertEquals(tc.getTableFormat(), TableFormat.ICEBERG); } @@ -88,22 +81,18 @@ public void testConversionTargetFromNameForUNKOWN() { @Test public void testConversionTargetFromFormatType() { - PerTableConfig perTableConfig = - getPerTableConfig(Arrays.asList(TableFormat.DELTA), SyncMode.INCREMENTAL); + TargetTable targetTable = getPerTableConfig(TableFormat.DELTA); Configuration conf = new Configuration(); conf.setStrings("spark.master", "local"); - ConversionTarget tc = - ConversionTargetFactory.getInstance() - .createForFormat(TableFormat.DELTA, perTableConfig, conf); + ConversionTarget tc = ConversionTargetFactory.getInstance().createForFormat(targetTable, conf); assertEquals(tc.getTableFormat(), TableFormat.DELTA); } - private PerTableConfig getPerTableConfig(List targetTableFormats, SyncMode syncMode) { - return PerTableConfigImpl.builder() - .tableName(getTableName()) - .tableBasePath("/tmp/doesnt/matter") - .targetTableFormats(targetTableFormats) - .syncMode(syncMode) + private TargetTable getPerTableConfig(String tableFormat) { + return TargetTable.builder() + .name(getTableName()) + .basePath("/tmp/doesnt/matter") + .formatName(tableFormat) .build(); } } diff --git a/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java b/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java deleted file mode 100644 index 4f057385e..000000000 --- a/xtable-core/src/test/java/org/apache/xtable/conversion/TestPerTableConfig.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.xtable.conversion; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.util.Collections; - -import org.junit.jupiter.api.Test; - -import org.apache.xtable.hudi.HudiSourceConfigImpl; -import org.apache.xtable.model.storage.TableFormat; -import org.apache.xtable.model.sync.SyncMode; - -class TestPerTableConfig { - - @Test - void sanitizePath() { - PerTableConfig tooManySlashes = - PerTableConfigImpl.builder() - .tableBasePath("s3://bucket//path") - .tableName("name") - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) - .build(); - assertEquals("s3://bucket/path", tooManySlashes.getTableBasePath()); - - PerTableConfig localFilePath = - PerTableConfigImpl.builder() - .tableBasePath("/local/data//path") - .tableName("name") - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) - .build(); - assertEquals("file:///local/data/path", localFilePath.getTableBasePath()); - - PerTableConfig properLocalFilePath = - PerTableConfigImpl.builder() - .tableBasePath("file:///local/data//path") - .tableName("name") - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) - .build(); - assertEquals("file:///local/data/path", properLocalFilePath.getTableBasePath()); - } - - @Test - void defaultValueSet() { - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableBasePath("file://bucket/path") - .tableName("name") - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) - .build(); - - assertEquals(24 * 7, perTableConfig.getTargetMetadataRetentionInHours()); - assertEquals(SyncMode.INCREMENTAL, perTableConfig.getSyncMode()); - assertEquals(HudiSourceConfigImpl.builder().build(), perTableConfig.getHudiSourceConfig()); - assertNull(perTableConfig.getNamespace()); - assertNull(perTableConfig.getIcebergCatalogConfig()); - } - - @Test - void errorIfRequiredArgsNotSet() { - assertThrows( - NullPointerException.class, - () -> - PerTableConfigImpl.builder() - .tableName("name") - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) - .build()); - - assertThrows( - NullPointerException.class, - () -> - PerTableConfigImpl.builder() - .tableBasePath("file://bucket/path") - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) - .build()); - - assertThrows( - NullPointerException.class, - () -> - PerTableConfigImpl.builder() - .tableBasePath("file://bucket/path") - .tableName("name") - .build()); - } - - @Test - void errorIfNoTargetsSet() { - Exception thrownException = - assertThrows( - IllegalArgumentException.class, - () -> - PerTableConfigImpl.builder() - .tableName("name") - .tableBasePath("file://bucket/path") - .targetTableFormats(Collections.emptyList()) - .build()); - assertEquals("Please provide at-least one format to sync", thrownException.getMessage()); - } -} diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java index 3e2d61f5a..8fcf07533 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/ITDeltaConversionTargetSource.java @@ -53,8 +53,7 @@ import org.apache.xtable.GenericTable; import org.apache.xtable.TestSparkDeltaTable; import org.apache.xtable.ValidationTestHelper; -import org.apache.xtable.conversion.PerTableConfig; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; @@ -149,7 +148,7 @@ void setUp() { hadoopConf.set("fs.defaultFS", "file:///"); conversionSourceProvider = new DeltaConversionSourceProvider(); - conversionSourceProvider.init(hadoopConf, null); + conversionSourceProvider.init(hadoopConf); } @Test @@ -165,11 +164,11 @@ void getCurrentSnapshotNonPartitionedTest() throws URISyntaxException { + basePath + "' AS SELECT * FROM VALUES (1, 2)"); // Create Delta source - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .tableBasePath(basePath.toString()) - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); @@ -223,11 +222,11 @@ void getCurrentSnapshotPartitionedTest() throws URISyntaxException { + basePath + "' AS SELECT 'SingleValue' AS part_col, 1 AS col1, 2 AS col2"); // Create Delta source - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .tableBasePath(basePath.toString()) - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); @@ -311,11 +310,11 @@ void getCurrentSnapshotGenColPartitionedTest() { + tableName + "` VALUES(1, CAST('2012-02-12 00:12:34' AS TIMESTAMP))"); // Create Delta source - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .tableBasePath(basePath.toString()) - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); @@ -350,11 +349,11 @@ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) { testSparkDeltaTable.insertRows(50); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testSparkDeltaTable.getTableName()) - .tableBasePath(testSparkDeltaTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); @@ -390,11 +389,11 @@ public void testsShowingVacuumHasNoEffectOnIncrementalSync() { // Insert 50 rows to 2018 partition. List commit1Rows = testSparkDeltaTable.insertRowsForPartition(50, 2018); Long timestamp1 = testSparkDeltaTable.getLastCommitTimestamp(); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testSparkDeltaTable.getTableName()) - .tableBasePath(testSparkDeltaTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); @@ -458,11 +457,11 @@ public void testVacuum(boolean isPartitioned) { testSparkDeltaTable.insertRows(50); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testSparkDeltaTable.getTableName()) - .tableBasePath(testSparkDeltaTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); @@ -506,11 +505,11 @@ public void testAddColumns(boolean isPartitioned) { testSparkDeltaTable.insertRows(50); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testSparkDeltaTable.getTableName()) - .tableBasePath(testSparkDeltaTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); @@ -563,11 +562,11 @@ public void testDropPartition() { testSparkDeltaTable.insertRowsForPartition(20, partitionValueToDelete); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testSparkDeltaTable.getTableName()) - .tableBasePath(testSparkDeltaTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); @@ -623,11 +622,11 @@ public void testOptimizeAndClustering(boolean isPartitioned) { testSparkDeltaTable.insertRows(50); allActiveFiles.add(testSparkDeltaTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testSparkDeltaTable.getTableName()) - .tableBasePath(testSparkDeltaTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.ICEBERG)) + SourceTable tableConfig = + SourceTable.builder() + .name(testSparkDeltaTable.getTableName()) + .basePath(testSparkDeltaTable.getBasePath()) + .formatName(TableFormat.DELTA) .build(); DeltaConversionSource conversionSource = conversionSourceProvider.getConversionSourceInstance(tableConfig); diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index 135ce9953..f0f889d25 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -26,7 +26,9 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -76,7 +78,7 @@ import io.delta.standalone.types.IntegerType; import io.delta.standalone.types.StringType; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.schema.InternalField; @@ -126,11 +128,11 @@ public void setup() throws IOException { Files.createDirectories(basePath); conversionTarget = new DeltaConversionTarget( - PerTableConfigImpl.builder() - .tableName(tableName) - .tableBasePath(basePath.toString()) - .targetMetadataRetentionInHours(1) - .targetTableFormats(Collections.singletonList(TableFormat.DELTA)) + TargetTable.builder() + .name(tableName) + .basePath(basePath.toString()) + .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) + .formatName(TableFormat.DELTA) .build(), sparkSession); } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java index 408e43733..c074bc23f 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceSource.java @@ -569,7 +569,7 @@ private HudiConversionSource getHudiSourceClient( .build(); HudiSourcePartitionSpecExtractor partitionSpecExtractor = new ConfigurationBasedPartitionSpecExtractor( - HudiSourceConfigImpl.builder().partitionFieldSpecConfig(xTablePartitionConfig).build()); + HudiSourceConfig.fromPartitionFieldSpecConfig(xTablePartitionConfig)); return new HudiConversionSource(hoodieTableMetaClient, partitionSpecExtractor); } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java index 7990bfbf1..128855672 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.nio.file.Path; +import java.time.Duration; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Arrays; @@ -69,7 +70,7 @@ import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieMetadataFileSystemView; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; import org.apache.xtable.model.schema.InternalField; @@ -582,11 +583,11 @@ private InternalTable getState(Instant latestCommitTime) { private HudiConversionTarget getTargetClient() { return new HudiConversionTarget( - PerTableConfigImpl.builder() - .tableBasePath(tableBasePath) - .targetTableFormats(Collections.singletonList(TableFormat.HUDI)) - .tableName("test_table") - .targetMetadataRetentionInHours(4) + TargetTable.builder() + .basePath(tableBasePath) + .formatName(TableFormat.HUDI) + .name("test_table") + .metadataRetention(Duration.of(4, ChronoUnit.HOURS)) .build(), CONFIGURATION, 3); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java index 997ab1842..3f20ac9db 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionTargetSource.java @@ -29,7 +29,6 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; @@ -46,8 +45,7 @@ import org.apache.iceberg.data.Record; import org.apache.xtable.TestIcebergTable; -import org.apache.xtable.conversion.PerTableConfig; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; @@ -65,7 +63,7 @@ public class ITIcebergConversionTargetSource { @BeforeEach void setup() { sourceProvider = new IcebergConversionSourceProvider(); - sourceProvider.init(hadoopConf, null); + sourceProvider.init(hadoopConf); } @ParameterizedTest @@ -97,11 +95,11 @@ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) { testIcebergTable.insertRows(50); allActiveFiles.add(testIcebergTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testIcebergTable.getTableName()) - .tableBasePath(testIcebergTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA)) + SourceTable tableConfig = + SourceTable.builder() + .name(testIcebergTable.getTableName()) + .basePath(testIcebergTable.getBasePath()) + .formatName(TableFormat.ICEBERG) .build(); IcebergConversionSource conversionSource = sourceProvider.getConversionSourceInstance(tableConfig); @@ -157,11 +155,11 @@ public void testDropPartition() { testIcebergTable.insertRecordsForPartition(20, partitionValueToDelete); allActiveFiles.add(testIcebergTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testIcebergTable.getTableName()) - .tableBasePath(testIcebergTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA)) + SourceTable tableConfig = + SourceTable.builder() + .name(testIcebergTable.getTableName()) + .basePath(testIcebergTable.getBasePath()) + .formatName(TableFormat.ICEBERG) .build(); IcebergConversionSource conversionSource = sourceProvider.getConversionSourceInstance(tableConfig); @@ -217,11 +215,11 @@ public void testDeleteAllRecordsInPartition() { testIcebergTable.insertRecordsForPartition(20, partitionValueToDelete); allActiveFiles.add(testIcebergTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testIcebergTable.getTableName()) - .tableBasePath(testIcebergTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA)) + SourceTable tableConfig = + SourceTable.builder() + .name(testIcebergTable.getTableName()) + .basePath(testIcebergTable.getBasePath()) + .formatName(TableFormat.ICEBERG) .build(); IcebergConversionSource conversionSource = sourceProvider.getConversionSourceInstance(tableConfig); @@ -277,11 +275,11 @@ public void testExpireSnapshots(boolean isPartitioned) throws InterruptedExcepti testIcebergTable.insertRows(50); allActiveFiles.add(testIcebergTable.getAllActiveFiles()); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testIcebergTable.getTableName()) - .tableBasePath(testIcebergTable.getBasePath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA)) + SourceTable tableConfig = + SourceTable.builder() + .name(testIcebergTable.getTableName()) + .basePath(testIcebergTable.getBasePath()) + .formatName(TableFormat.ICEBERG) .build(); IcebergConversionSource conversionSource = sourceProvider.getConversionSourceInstance(tableConfig); @@ -318,12 +316,11 @@ public void testForIncrementalSyncSafetyCheck(boolean shouldExpireSnapshots) { // Insert 50 rows to INFO partition. List commit1Rows = testIcebergTable.insertRecordsForPartition(50, "INFO"); Long timestamp1 = testIcebergTable.getLastCommitTimestamp(); - PerTableConfig tableConfig = - PerTableConfigImpl.builder() - .tableName(testIcebergTable.getTableName()) - .tableBasePath(testIcebergTable.getBasePath()) - .tableDataPath(testIcebergTable.getDataPath()) - .targetTableFormats(Arrays.asList(TableFormat.HUDI, TableFormat.DELTA)) + SourceTable tableConfig = + SourceTable.builder() + .name(testIcebergTable.getTableName()) + .basePath(testIcebergTable.getBasePath()) + .formatName(TableFormat.ICEBERG) .build(); // Upsert all rows inserted before, so all files are replaced. diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java index d100a3134..784d6585b 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionTargetSource.java @@ -26,7 +26,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Instant; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -48,8 +47,7 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Types; -import org.apache.xtable.conversion.PerTableConfig; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; @@ -77,7 +75,7 @@ void setUp() throws IOException { hadoopConf.set("fs.defaultFS", "file:///"); sourceProvider = new IcebergConversionSourceProvider(); - sourceProvider.init(hadoopConf, null); + sourceProvider.init(hadoopConf); tableManager = IcebergTableManager.of(hadoopConf); @@ -91,7 +89,7 @@ void setUp() throws IOException { @Test void getTableTest(@TempDir Path workingDir) throws IOException { Table catalogSales = createTestTableWithData(workingDir.toString()); - PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales); + SourceTable sourceTableConfig = getPerTableConfig(catalogSales); IcebergConversionSource conversionSource = sourceProvider.getConversionSourceInstance(sourceTableConfig); @@ -123,7 +121,7 @@ public void testGetCurrentSnapshot(@TempDir Path workingDir) throws IOException Table catalogSales = createTestTableWithData(workingDir.toString()); Snapshot iceCurrentSnapshot = catalogSales.currentSnapshot(); - PerTableConfig sourceTableConfig = getPerTableConfig(catalogSales); + SourceTable sourceTableConfig = getPerTableConfig(catalogSales); IcebergDataFileExtractor spyDataFileExtractor = spy(IcebergDataFileExtractor.builder().build()); IcebergPartitionValueConverter spyPartitionConverter = @@ -384,7 +382,7 @@ private Table createTestCatalogTable(String workingDir) { } private IcebergConversionSource getIcebergConversionSource(Table catalogSales) { - PerTableConfig tableConfig = getPerTableConfig(catalogSales); + SourceTable tableConfig = getPerTableConfig(catalogSales); return IcebergConversionSource.builder() .hadoopConf(hadoopConf) @@ -392,11 +390,11 @@ private IcebergConversionSource getIcebergConversionSource(Table catalogSales) { .build(); } - private static PerTableConfig getPerTableConfig(Table catalogSales) { - return PerTableConfigImpl.builder() - .tableName(catalogSales.name()) - .tableBasePath(catalogSales.location()) - .targetTableFormats(Collections.singletonList(TableFormat.DELTA)) + private static SourceTable getPerTableConfig(Table catalogSales) { + return SourceTable.builder() + .name(catalogSales.name()) + .basePath(catalogSales.location()) + .formatName(TableFormat.ICEBERG) .build(); } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index 45140b447..bd36dde91 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -37,7 +37,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -81,7 +83,7 @@ import com.google.common.collect.Sets; import org.apache.xtable.ITConversionController; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.metadata.TableSyncMetadata; @@ -189,11 +191,11 @@ public void setup() throws IOException { private IcebergConversionTarget getConversionTarget() { return new IcebergConversionTarget( - PerTableConfigImpl.builder() - .tableBasePath(basePath.toString()) - .tableName(tableName) - .targetMetadataRetentionInHours(1) - .targetTableFormats(Collections.singletonList(TableFormat.ICEBERG)) + TargetTable.builder() + .basePath(basePath.toString()) + .name(tableName) + .metadataRetention(Duration.of(1, ChronoUnit.HOURS)) + .formatName(TableFormat.ICEBERG) .build(), CONFIGURATION, mockSchemaExtractor, diff --git a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java index 8b1a24e06..341b2cb02 100644 --- a/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java +++ b/xtable-core/src/test/java/org/apache/xtable/loadtest/LoadTest.java @@ -18,9 +18,12 @@ package org.apache.xtable.loadtest; +import static org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG; + import java.nio.file.Path; import java.util.Arrays; -import java.util.Collections; +import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -34,11 +37,13 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.xtable.GenericTable; import org.apache.xtable.TestJavaHudiTable; +import org.apache.xtable.conversion.ConversionConfig; import org.apache.xtable.conversion.ConversionController; import org.apache.xtable.conversion.ConversionSourceProvider; -import org.apache.xtable.conversion.PerTableConfig; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.hudi.HudiConversionSourceProvider; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; @@ -56,7 +61,7 @@ public class LoadTest { @BeforeEach public void setup() { hudiConversionSourceProvider = new HudiConversionSourceProvider(); - hudiConversionSourceProvider.init(CONFIGURATION, Collections.emptyMap()); + hudiConversionSourceProvider.init(CONFIGURATION); } @Test @@ -75,16 +80,15 @@ void testFullSyncWithManyPartitions() { .collect(Collectors.toList()), false); } - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA)) - .tableBasePath(table.getBasePath()) - .syncMode(SyncMode.FULL) - .build(); + ConversionConfig conversionConfig = + getTableSyncConfig( + SyncMode.FULL, + tableName, + table, + Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA)); ConversionController conversionController = new ConversionController(CONFIGURATION); long start = System.currentTimeMillis(); - conversionController.sync(perTableConfig, hudiConversionSourceProvider); + conversionController.sync(conversionConfig, hudiConversionSourceProvider); long end = System.currentTimeMillis(); System.out.println("Full sync took " + (end - start) + "ms"); } @@ -104,16 +108,15 @@ void testIncrementalSyncWithManyCommits() { TestJavaHudiTable.forStandardSchema( tableName, tempDir, "level:SIMPLE", HoodieTableType.COPY_ON_WRITE, archivalConfig)) { table.insertRecords(1, "partition0", false); - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .targetTableFormats(Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA)) - .tableBasePath(table.getBasePath()) - .syncMode(SyncMode.INCREMENTAL) - .build(); + ConversionConfig conversionConfig = + getTableSyncConfig( + SyncMode.INCREMENTAL, + tableName, + table, + Arrays.asList(TableFormat.ICEBERG, TableFormat.DELTA)); // sync once to establish first commit ConversionController conversionController = new ConversionController(CONFIGURATION); - conversionController.sync(perTableConfig, hudiConversionSourceProvider); + conversionController.sync(conversionConfig, hudiConversionSourceProvider); for (int i = 0; i < numCommits; i++) { table.insertRecords( 1, @@ -124,9 +127,40 @@ void testIncrementalSyncWithManyCommits() { } long start = System.currentTimeMillis(); - conversionController.sync(perTableConfig, hudiConversionSourceProvider); + conversionController.sync(conversionConfig, hudiConversionSourceProvider); long end = System.currentTimeMillis(); System.out.println("Incremental sync took " + (end - start) + "ms"); } } + + private static ConversionConfig getTableSyncConfig( + SyncMode syncMode, String tableName, GenericTable table, List targetTableFormats) { + Properties sourceProperties = new Properties(); + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, "level:VALUE"); + SourceTable sourceTable = + SourceTable.builder() + .name(tableName) + .formatName(TableFormat.HUDI) + .basePath(table.getBasePath()) + .dataPath(table.getDataPath()) + .additionalProperties(sourceProperties) + .build(); + + List targetTables = + targetTableFormats.stream() + .map( + formatName -> + TargetTable.builder() + .name(tableName) + .formatName(formatName) + .basePath(table.getBasePath()) + .build()) + .collect(Collectors.toList()); + + return ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) + .syncMode(syncMode) + .build(); + } } diff --git a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java index 67b004170..4d878cea5 100644 --- a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java +++ b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncConfig.java @@ -26,12 +26,12 @@ public class XTableSyncConfig extends HoodieSyncConfig implements Serializable { - public static final ConfigProperty ONE_TABLE_FORMATS = + public static final ConfigProperty XTABLE_FORMATS = ConfigProperty.key("hoodie.xtable.formats.to.sync") .defaultValue("DELTA,ICEBERG") .withDocumentation("Comma separated list of formats to sync."); - public static final ConfigProperty ONE_TABLE_TARGET_METADATA_RETENTION_HOURS = + public static final ConfigProperty XTABLE_TARGET_METADATA_RETENTION_HOURS = ConfigProperty.key("hoodie.xtable.target.metadata.retention.hr") .defaultValue(24 * 7) .withDocumentation("Retention in hours for metadata in target table."); diff --git a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java index 49a7a7435..a9653eb51 100644 --- a/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java +++ b/xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java @@ -18,8 +18,11 @@ package org.apache.xtable.hudi.sync; +import static org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG; +import static org.apache.xtable.model.storage.TableFormat.HUDI; + +import java.time.Duration; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -34,11 +37,11 @@ import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.hudi.sync.common.HoodieSyncTool; +import org.apache.xtable.conversion.ConversionConfig; import org.apache.xtable.conversion.ConversionController; -import org.apache.xtable.conversion.PerTableConfig; -import org.apache.xtable.conversion.PerTableConfigImpl; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.hudi.HudiConversionSourceProvider; -import org.apache.xtable.hudi.HudiSourceConfigImpl; import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.model.sync.SyncResult; @@ -55,39 +58,57 @@ public XTableSyncTool(Properties props, Configuration hadoopConf) { super(props, hadoopConf); this.config = new XTableSyncConfig(props); this.hudiConversionSourceProvider = new HudiConversionSourceProvider(); - hudiConversionSourceProvider.init(hadoopConf, Collections.emptyMap()); + hudiConversionSourceProvider.init(hadoopConf); } @Override public void syncHoodieTable() { List formatsToSync = - Arrays.stream(config.getString(XTableSyncConfig.ONE_TABLE_FORMATS).split(",")) + Arrays.stream(config.getString(XTableSyncConfig.XTABLE_FORMATS).split(",")) .map(format -> format.toUpperCase()) .collect(Collectors.toList()); String basePath = config.getString(HoodieSyncConfig.META_SYNC_BASE_PATH); String tableName = config.getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY); - PerTableConfig perTableConfig = - PerTableConfigImpl.builder() - .tableName(tableName) - .tableBasePath(basePath) - .targetTableFormats(formatsToSync) - .hudiSourceConfig( - HudiSourceConfigImpl.builder() - .partitionFieldSpecConfig(getPartitionSpecConfig()) - .build()) + Properties sourceProperties = new Properties(); + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, getPartitionSpecConfig()); + SourceTable sourceTable = + SourceTable.builder() + .name(tableName) + .formatName(HUDI) + .basePath(basePath) + .additionalProperties(sourceProperties) + .build(); + Duration metadataRetention = + config.contains(XTableSyncConfig.XTABLE_TARGET_METADATA_RETENTION_HOURS) + ? Duration.ofHours( + config.getInt(XTableSyncConfig.XTABLE_TARGET_METADATA_RETENTION_HOURS)) + : null; + List targetTables = + formatsToSync.stream() + .map( + format -> + TargetTable.builder() + .basePath(basePath) + .metadataRetention(metadataRetention) + .formatName(format) + .name(tableName) + .build()) + .collect(Collectors.toList()); + ConversionConfig conversionConfig = + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) .syncMode(SyncMode.INCREMENTAL) - .targetMetadataRetentionInHours( - config.getInt(XTableSyncConfig.ONE_TABLE_TARGET_METADATA_RETENTION_HOURS)) .build(); Map results = - new ConversionController(hadoopConf).sync(perTableConfig, hudiConversionSourceProvider); + new ConversionController(hadoopConf).sync(conversionConfig, hudiConversionSourceProvider); String failingFormats = results.entrySet().stream() .filter( entry -> entry.getValue().getStatus().getStatusCode() != SyncResult.SyncStatusCode.SUCCESS) - .map(entry -> entry.getKey().toString()) + .map(Map.Entry::getKey) .collect(Collectors.joining(",")); if (!failingFormats.isEmpty()) { throw new HoodieException("Unable to sync to InternalTable for formats: " + failingFormats); diff --git a/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java b/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java index d44465c8e..4024674b8 100644 --- a/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java +++ b/xtable-hudi-support/xtable-hudi-support-extensions/src/test/java/org/apache/xtable/hudi/sync/TestXTableSyncTool.java @@ -98,7 +98,7 @@ public void testSync(String partitionPath) { writeBasicHudiTable(path, options); Properties properties = new Properties(); - properties.put(XTableSyncConfig.ONE_TABLE_FORMATS.key(), "iceberg,DELTA"); + properties.put(XTableSyncConfig.XTABLE_FORMATS.key(), "iceberg,DELTA"); properties.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionPath); properties.put(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), path); properties.putAll(options); diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index 7fb65c423..c84753de5 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -25,6 +25,8 @@ import java.nio.file.Paths; import java.util.List; import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; import lombok.Data; import lombok.extern.log4j.Log4j2; @@ -44,17 +46,16 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.annotations.VisibleForTesting; +import org.apache.xtable.conversion.ConversionConfig; import org.apache.xtable.conversion.ConversionController; import org.apache.xtable.conversion.ConversionSourceProvider; -import org.apache.xtable.conversion.PerTableConfig; -import org.apache.xtable.conversion.PerTableConfigImpl; -import org.apache.xtable.hudi.ConfigurationBasedPartitionSpecExtractor; -import org.apache.xtable.hudi.HudiSourceConfigImpl; +import org.apache.xtable.conversion.SourceTable; +import org.apache.xtable.conversion.TargetTable; +import org.apache.xtable.hudi.HudiSourceConfig; import org.apache.xtable.iceberg.IcebergCatalogConfig; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.reflection.ReflectionUtils; -import org.apache.xtable.utilities.RunSync.TableFormatConverters.ConversionConfig; /** * Provides a standalone runner for the sync process. See README.md for more details on how to run @@ -129,7 +130,7 @@ public static void main(String[] args) throws IOException { String sourceFormat = datasetConfig.sourceFormat; customConfig = getCustomConfigurations(cmd, CONVERTERS_CONFIG_PATH); TableFormatConverters tableFormatConverters = loadTableFormatConversionConfigs(customConfig); - ConversionConfig sourceConversionConfig = + TableFormatConverters.ConversionConfig sourceConversionConfig = tableFormatConverters.getTableFormatConverters().get(sourceFormat); if (sourceConversionConfig == null) { throw new IllegalArgumentException( @@ -140,7 +141,7 @@ public static void main(String[] args) throws IOException { String sourceProviderClass = sourceConversionConfig.conversionSourceProviderClass; ConversionSourceProvider conversionSourceProvider = ReflectionUtils.createInstanceOfClass(sourceProviderClass); - conversionSourceProvider.init(hadoopConf, sourceConversionConfig.configuration); + conversionSourceProvider.init(hadoopConf); List tableFormatList = datasetConfig.getTargetFormats(); ConversionController conversionController = new ConversionController(hadoopConf); @@ -149,24 +150,45 @@ public static void main(String[] args) throws IOException { "Running sync for basePath {} for following table formats {}", table.getTableBasePath(), tableFormatList); - PerTableConfig config = - PerTableConfigImpl.builder() - .tableBasePath(table.getTableBasePath()) - .tableName(table.getTableName()) + Properties sourceProperties = new Properties(); + if (table.getPartitionSpec() != null) { + sourceProperties.put( + HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG, table.getPartitionSpec()); + } + SourceTable sourceTable = + SourceTable.builder() + .name(table.getTableName()) + .basePath(table.getTableBasePath()) .namespace(table.getNamespace() == null ? null : table.getNamespace().split("\\.")) - .tableDataPath(table.getTableDataPath()) - .icebergCatalogConfig(icebergCatalogConfig) - .hudiSourceConfig( - HudiSourceConfigImpl.builder() - .partitionSpecExtractorClass( - ConfigurationBasedPartitionSpecExtractor.class.getName()) - .partitionFieldSpecConfig(table.getPartitionSpec()) - .build()) - .targetTableFormats(tableFormatList) + .dataPath(table.getTableDataPath()) + .catalogConfig(icebergCatalogConfig) + .additionalProperties(sourceProperties) + .formatName(sourceFormat) + .build(); + List targetTables = + tableFormatList.stream() + .map( + tableFormat -> + TargetTable.builder() + .name(table.getTableName()) + .basePath(table.getTableBasePath()) + .namespace( + table.getNamespace() == null + ? null + : table.getNamespace().split("\\.")) + .catalogConfig(icebergCatalogConfig) + .formatName(tableFormat) + .build()) + .collect(Collectors.toList()); + + ConversionConfig conversionConfig = + ConversionConfig.builder() + .sourceTable(sourceTable) + .targetTables(targetTables) .syncMode(SyncMode.INCREMENTAL) .build(); try { - conversionController.sync(config, conversionSourceProvider); + conversionController.sync(conversionConfig, conversionSourceProvider); } catch (Exception e) { log.error(String.format("Error running sync for %s", table.getTableBasePath()), e); }