Skip to content

Commit

Permalink
[HUDI-7084] Fixing schema retrieval for table w/ no commits (apache#1…
Browse files Browse the repository at this point in the history
…0069)

* Fixing schema retrieval for table w/ no commits

* fixing compilation failure
  • Loading branch information
nsivabalan committed Nov 23, 2023
1 parent 82cb7fe commit 301f8d8
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,12 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio

try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
Option<Schema> existingTableSchema = schemaResolver.getTableAvroSchemaIfPresent(false);
if (!existingTableSchema.isPresent()) {
return;
}
Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchema(false));
Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames());
} catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema;
Expand Down Expand Up @@ -113,8 +114,12 @@ public TableSchemaResolver(HoodieTableMetaClient metaClient) {
this.hasOperationField = Lazy.lazily(this::hasOperationField);
}

public Schema getTableAvroSchemaFromDataFile() {
return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
public Schema getTableAvroSchemaFromDataFile() throws Exception {
return getTableAvroSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError());
}

private Option<Schema> getTableAvroSchemaFromDataFileInternal() {
return getTableParquetSchemaFromDataFile().map(this::convertParquetSchemaToAvro);
}

/**
Expand All @@ -135,7 +140,7 @@ public Schema getTableAvroSchema() throws Exception {
* @throws Exception
*/
public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception {
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()).orElseThrow(schemaNotFoundError());
}

/**
Expand All @@ -148,7 +153,8 @@ public Schema getTableAvroSchema(String timestamp) throws Exception {
.filterCompletedInstants()
.findInstantsBeforeOrEquals(timestamp)
.lastInstant();
return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant);
return getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), instant)
.orElseThrow(schemaNotFoundError());
}

/**
Expand All @@ -157,7 +163,7 @@ public Schema getTableAvroSchema(String timestamp) throws Exception {
* @param instant as of which table's schema will be fetched
*/
public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception {
return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant));
return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)).orElseThrow(schemaNotFoundError());
}

/**
Expand Down Expand Up @@ -188,11 +194,15 @@ public MessageType getTableParquetSchema(boolean includeMetadataField) throws Ex
*/
@Deprecated
public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
return getTableAvroSchema(false);
return getTableAvroSchemaInternal(false, Option.empty()).orElseThrow(schemaNotFoundError());
}

public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields) {
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
}

private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {
Schema schema =
private Option<Schema> getTableAvroSchemaInternal(boolean includeMetadataFields, Option<HoodieInstant> instantOpt) {
Option<Schema> schema =
(instantOpt.isPresent()
? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields)
: getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
Expand All @@ -203,18 +213,18 @@ private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option<
? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get())
: tableSchema)
)
.orElseGet(() -> {
Schema schemaFromDataFile = getTableAvroSchemaFromDataFile();
.or(() -> {
Option<Schema> schemaFromDataFile = getTableAvroSchemaFromDataFileInternal();
return includeMetadataFields
? schemaFromDataFile
: HoodieAvroUtils.removeMetadataFields(schemaFromDataFile);
: schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields);
});

// TODO partition columns have to be appended in all read-paths
if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
if (metaClient.getTableConfig().shouldDropPartitionColumns() && schema.isPresent()) {
return metaClient.getTableConfig().getPartitionFields()
.map(partitionFields -> appendPartitionColumns(schema, Option.ofNullable(partitionFields)))
.orElse(schema);
.map(partitionFields -> appendPartitionColumns(schema.get(), Option.ofNullable(partitionFields)))
.or(() -> schema);
}

return schema;
Expand Down Expand Up @@ -257,7 +267,7 @@ private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant instant, b
/**
* Fetches the schema for a table from any the table's data files
*/
private MessageType getTableParquetSchemaFromDataFile() {
private Option<MessageType> getTableParquetSchemaFromDataFile() {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = getLatestCommitMetadataWithValidData();
try {
switch (metaClient.getTableType()) {
Expand All @@ -270,10 +280,11 @@ private MessageType getTableParquetSchemaFromDataFile() {
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().iterator();
return fetchSchemaFromFiles(filePaths);
return Option.of(fetchSchemaFromFiles(filePaths));
} else {
throw new IllegalArgumentException("Could not find any data file written for commit, "
LOG.warn("Could not find any data file written for commit, "
+ "so could not get schema for table " + metaClient.getBasePath());
return Option.empty();
}
default:
LOG.error("Unknown table type " + metaClient.getTableType());
Expand Down Expand Up @@ -308,7 +319,7 @@ private MessageType convertAvroSchemaToParquet(Schema schema) {
*/
public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception {
if (metaClient.isTimelineNonEmpty()) {
return Option.of(getTableAvroSchemaInternal(includeMetadataFields, Option.empty()));
return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
}

return Option.empty();
Expand Down Expand Up @@ -569,4 +580,8 @@ public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]>

return dataSchema;
}

private Supplier<Exception> schemaNotFoundError() {
return () -> new IllegalArgumentException("No schema found for table at " + metaClient.getBasePathV2().toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public static void setPreCombineField(Configuration conf, HoodieTableMetaClient
* @param conf The configuration
* @param metaClient The meta client
*/
public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) {
public static void inferChangelogMode(Configuration conf, HoodieTableMetaClient metaClient) throws Exception {
TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient);
Schema tableAvroSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
if (tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,10 +1152,11 @@ private Schema getSchemaForWriteConfig(Schema targetSchema) {
.build();
int totalCompleted = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
if (totalCompleted > 0) {
try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
newWriteSchema = schemaResolver.getTableAvroSchema(false);
} catch (IllegalArgumentException e) {
TableSchemaResolver schemaResolver = new TableSchemaResolver(meta);
Option<Schema> tableSchema = schemaResolver.getTableAvroSchemaIfPresent(false);
if (tableSchema.isPresent()) {
newWriteSchema = tableSchema.get();
} else {
LOG.warn("Could not fetch schema from table. Falling back to using target schema from schema provider");
}
}
Expand Down

0 comments on commit 301f8d8

Please sign in to comment.