Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4465] Optimizing file-listing sequence of Metadata Table #6016

Merged
merged 27 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
885619c
Replaced string paht w/ Hadoop's `Path` to avoid costly conversions i…
Jun 25, 2022
f2ad090
Avoid fetching `FileSystem.get` since it's unused;
Jun 25, 2022
a6c3130
Cache `DateFormatter`/`TimestampFormatter` to avoid loading these t/h…
Jun 25, 2022
c812871
Avoid re-instantiating `HoodieTableMetadata` twice w/in `BaseHoodieTa…
Jun 27, 2022
6c96323
Fixing compilation
Jun 28, 2022
a3945f2
Avoid creating costly Hadoop's `Path` object wherever possible, repla…
Jun 27, 2022
8bc2456
Fixing compilation
Jun 27, 2022
8e10cb8
Avoid loading defaults in Hadoop conf when init-ing HFile reader;
Jun 30, 2022
803facf
Short-circuit fetching partition path for non-partitioned tables (sim…
Jun 30, 2022
fcc7e95
Cleaned up `BaseTableMetadata`
Jun 30, 2022
51f27e6
Avoid looking up `FileSystem` for every partition when listing partit…
Jun 30, 2022
bfe5701
`lint`
Jun 30, 2022
045c526
Revert accidental change
Jul 5, 2022
350c16e
Reverting experimental changes
Jul 25, 2022
6508ce9
Fixing compilation (for Spark 2.4)
Jul 26, 2022
2ffe683
Make `CachingPath` non `Serializable`;
Jul 26, 2022
de08ca3
Fixed tests
Jul 27, 2022
f88229b
Fixed invalid cast in `HoodieTableMetaClient`
Jul 27, 2022
068a110
Fixing tests w/ invalid configuration
Jul 27, 2022
3db948f
Fixing more tests w/ invalid configuration
Jul 27, 2022
56d5176
Fixed configs in IT to properly mirror configs for DS cases
Jul 28, 2022
409066f
Tidying up
Jul 28, 2022
95ce817
Carve out exception for MT to avoid treating it as non-partitioned one
Jul 28, 2022
d448c74
Fixed `SimpleKeyGenerator` to assert that configured primary-key and
Sep 3, 2022
ea2c947
Fixed improperly configured test
Sep 3, 2022
8be611a
Fixed invalid partition-path configs in `TestHoodieDeltaStreamer` suite
Sep 3, 2022
46e53b5
Rebased `TimeTravelQuery` test to use Simple KG instead
Sep 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docker/demo/config/dfs-source.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ include=base.properties
# Key fields, for kafka example
hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=date
# NOTE: We have to duplicate configuration since this is being used
# w/ both Spark and DeltaStreamer
hoodie.table.recordkey.fields=key
hoodie.table.partition.fields=date
# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.schemaprovider.source.schema.file=/var/demo/config/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=/var/demo/config/schema.avsc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,21 +1006,21 @@ protected void compactIfNecessary(BaseHoodieWriteClient writeClient, String inst
// finish off any pending compactions if any from previous attempt.
writeClient.runAnyPendingCompactions();

String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
String latestDeltaCommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
.get().getTimestamp();
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBefore(instantTime).getInstants().collect(Collectors.toList());

if (!pendingInstants.isEmpty()) {
LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
pendingInstants.size(), latestDeltaCommitTime, Arrays.toString(pendingInstants.toArray())));
return;
}

// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
final String compactionInstantTime = latestDeltacommitTime + "001";
final String compactionInstantTime = latestDeltaCommitTime + "001";
if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
writeClient.compact(compactionInstantTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public abstract class BuiltinKeyGenerator extends BaseKeyGenerator implements Sp
protected static final UTF8String NULL_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(NULL_RECORDKEY_PLACEHOLDER);
protected static final UTF8String EMPTY_RECORD_KEY_PLACEHOLDER_UTF8 = UTF8String.fromString(EMPTY_RECORDKEY_PLACEHOLDER);


protected transient volatile SparkRowConverter rowConverter;
protected transient volatile SparkRowAccessor rowAccessor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
Expand All @@ -46,6 +47,12 @@ public SimpleKeyGenerator(TypedProperties props) {

SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
super(props);
// Make sure key-generator is configured properly
ValidationUtils.checkArgument(recordKeyField == null || !recordKeyField.isEmpty(),
"Record key field has to be non-empty!");
ValidationUtils.checkArgument(partitionPathField == null || !partitionPathField.isEmpty(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the validation message be more user-friendly? Let's say
"Partition path field has to be non-empty! For non-partitioned table, set key generator class to NonPartitionedKeyGenerator".
Also, why are these validations only added for SimpleKeyGenerator? Why not other keygens as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to put suggestions into exception messages -- exception messages should be focused on the problem triggering it, rather than on potential to remedy it (empty partition-path field is usually a sign of misconfiguration, since there's no default value, meaning that user passes "" explicitly)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.
Should we add these validations to other keygens as well?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. It will be taken up separately. @alexeykudinkin in case if you have a JIRA, please link it here. For simple keygen we need the validation because of misconfiguration of some tests that were passing “” as partition fields.

"Partition path field has to be non-empty!");

this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField);
this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField);
this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ import org.apache.spark.sql.types.DataType

trait SparkParsePartitionUtil extends Serializable {

def parsePartition(
path: Path,
typeInference: Boolean,
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
timeZone: TimeZone): InternalRow
def parsePartition(path: Path,
typeInference: Boolean,
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
timeZone: TimeZone,
validatePartitionValues: Boolean = false): InternalRow
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait SparkAdapter extends Serializable {
/**
* Create the SparkParsePartitionUtil.
*/
def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil
def getSparkParsePartitionUtil: SparkParsePartitionUtil

/**
* ParserInterface#parseMultipartIdentifier is supported since spark3, for spark2 this should not be called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void setUp(boolean populateMetaFields, boolean partitioned) throws Excep
initTestDataGenerator(new String[] {""});
}
initFileSystem();
Properties props = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
Properties props = getPropertiesForKeyGen(populateMetaFields);
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_row_key");
metaClient = HoodieTestUtils.init(hadoopConf, basePath, HoodieTableType.MERGE_ON_READ, props);
config = getConfigBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.Transformations;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -61,7 +62,6 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.storage.StorageLevel;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -87,9 +87,8 @@ public class TestHoodieMergeOnReadTable extends SparkClientFunctionalTestHarness
private HoodieTableMetaClient metaClient;
private HoodieTestDataGenerator dataGen;

@BeforeEach
void setUp() throws IOException {
Properties properties = new Properties();
void setUp(Properties props) throws IOException {
Properties properties = CollectionUtils.copy(props);
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);
dataGen = new HoodieTestDataGenerator();
Expand All @@ -99,6 +98,9 @@ void setUp() throws IOException {
@Test
public void testMetadataAggregateFromWriteStatus() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false).withWriteStatusClass(MetadataMergeWriteStatus.class).build();

setUp(cfg.getProps());

try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {

String newCommitTime = "001";
Expand All @@ -125,6 +127,9 @@ public void testUpsertPartitioner(boolean populateMetaFields) throws Exception {
HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();

setUp(cfg.getProps());

try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {

/**
Expand Down Expand Up @@ -213,6 +218,8 @@ public void testLogFileCountsAfterCompaction(boolean preserveCommitMeta) throws
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig config = cfgBuilder.build();

setUp(config.getProps());

try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
Expand Down Expand Up @@ -302,6 +309,8 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep
HoodieWriteConfig cfg = getConfigBuilder(false, rollbackUsingMarkers, IndexType.INMEMORY)
.withAutoCommit(false).build();

setUp(cfg.getProps());

try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
HoodieTable table = HoodieSparkTable.create(cfg, context(), metaClient);

Expand Down Expand Up @@ -381,6 +390,9 @@ public void testMetadataStatsOnCommit(Boolean rollbackUsingMarkers) throws Excep
@Test
public void testRollingStatsWithSmallFileHandling() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();

setUp(cfg.getProps());

try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
Map<String, Long> fileIdToInsertsMap = new HashMap<>();
Map<String, Long> fileIdToUpsertsMap = new HashMap<>();
Expand Down Expand Up @@ -497,6 +509,9 @@ public void testRollingStatsWithSmallFileHandling() throws Exception {
@Test
public void testHandleUpdateWithMultiplePartitions() throws Exception {
HoodieWriteConfig cfg = getConfig(true);

setUp(cfg.getProps());

try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {

/**
Expand Down Expand Up @@ -578,6 +593,9 @@ public void testReleaseResource() throws Exception {
HoodieWriteConfig.Builder builder = getConfigBuilder(true);
builder.withReleaseResourceEnabled(true);
builder.withAutoCommit(false);

setUp(builder.build().getProps());

/**
* Write 1 (test when RELEASE_RESOURCE_ENABLE is true)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -98,8 +99,13 @@ public void testWriteDuringCompaction() throws IOException {
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();

Properties props = getPropertiesForKeyGen(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass HoodieTableConfig.POPULATE_META_FIELDS.defaultValue() instead of hard-coding true?

props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);

// write data and commit
Expand Down Expand Up @@ -138,8 +144,13 @@ public void testWriteLogDuringCompaction(boolean enableMetadataTable, boolean en
.withLayoutConfig(HoodieLayoutConfig.newBuilder()
.withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build()).build();
metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, config.getProps());
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
.build();

Properties props = getPropertiesForKeyGen(true);
props.putAll(config.getProps());

metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
client = getHoodieWriteClient(config);

final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void setUp() {
public void testIncrementalReadsWithCompaction() throws Exception {
final String partitionPath = "2020/02/20"; // use only one partition for this test
final HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(new String[] { partitionPath });
Properties props = new Properties();
Properties props = getPropertiesForKeyGen(true);
props.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
HoodieWriteConfig cfg = getConfigBuilder(true).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
Expand Down Expand Up @@ -155,7 +156,7 @@ void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) thro
addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build();

Properties properties = new Properties();
Properties properties = CollectionUtils.copy(cfg.getProps());
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);

Expand Down Expand Up @@ -327,7 +328,7 @@ void testMultiRollbackWithDeltaAndCompactionCommit() throws Exception {
addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
HoodieWriteConfig cfg = cfgBuilder.build();

Properties properties = populateMetaFields ? new Properties() : getPropertiesForKeyGen();
Properties properties = getPropertiesForKeyGen(populateMetaFields);
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());
HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);

Expand Down Expand Up @@ -606,8 +607,10 @@ void testMORTableRestore(boolean restoreAfterCompaction) throws Exception {
.withMarkersType(MarkerType.DIRECT.name());
HoodieWriteConfig cfg = cfgBuilder.build();

Properties properties = new Properties();
Properties properties = getPropertiesForKeyGen(true);
properties.putAll(cfg.getProps());
properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().toString());

HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties);

try (final SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,12 @@ protected int incrementTimelineServicePortToUse() {
}

protected Properties getPropertiesForKeyGen() {
return getPropertiesForKeyGen(false);
}

protected Properties getPropertiesForKeyGen(boolean populateMetaFields) {
Properties properties = new Properties();
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields));
properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, Strin
}

public HoodieTableMetaClient getHoodieMetaClient(Configuration hadoopConf, String basePath) throws IOException {
return getHoodieMetaClient(hadoopConf, basePath, new Properties());
return getHoodieMetaClient(hadoopConf, basePath, getPropertiesForKeyGen(true));
}

@Override
Expand Down Expand Up @@ -310,8 +310,12 @@ protected FileStatus[] listAllBaseFilesInPath(HoodieTable table) throws IOExcept
}

protected Properties getPropertiesForKeyGen() {
return getPropertiesForKeyGen(false);
}

protected Properties getPropertiesForKeyGen(boolean populateMetaFields) {
Properties properties = new Properties();
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), "false");
properties.put(HoodieTableConfig.POPULATE_META_FIELDS.key(), String.valueOf(populateMetaFields));
properties.put("hoodie.datasource.write.recordkey.field", "_row_key");
properties.put("hoodie.datasource.write.partitionpath.field", "partition_path");
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), "_row_key");
Expand All @@ -321,9 +325,9 @@ protected Properties getPropertiesForKeyGen() {
}

protected void addConfigsForPopulateMetaFields(HoodieWriteConfig.Builder configBuilder, boolean populateMetaFields) {
configBuilder.withProperties(getPropertiesForKeyGen(populateMetaFields));
if (!populateMetaFields) {
configBuilder.withProperties(getPropertiesForKeyGen())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
configBuilder.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.SIMPLE).build());
}
}

Expand Down
Loading