Skip to content

Commit

Permalink
[HUDI-4003] Try to read all the log file to parse schema (#5473)
Browse files Browse the repository at this point in the history
  • Loading branch information
lanyuanxiaoyao authored May 10, 2022
1 parent 6fd21d0 commit 4258a71
Showing 1 changed file with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
Expand Down Expand Up @@ -98,8 +99,8 @@ private MessageType getTableParquetSchemaFromDataFile() {
// For COW table, the file has data written must be in parquet or orc format currently.
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
return readSchemaFromBaseFile(filePath);
Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
return fetchSchemaFromFiles(filePaths);
} else {
throw new IllegalArgumentException("Could not find any data file written for commit, "
+ "so could not get schema for table " + metaClient.getBasePath());
Expand All @@ -109,13 +110,8 @@ private MessageType getTableParquetSchemaFromDataFile() {
// Determine the file format based on the file name, and then extract schema from it.
if (instantAndCommitMetadata.isPresent()) {
HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight();
String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get();
if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
// this is a log file
return readSchemaFromLogFile(new Path(filePath));
} else {
return readSchemaFromBaseFile(filePath);
}
Iterator<String> filePaths = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
return fetchSchemaFromFiles(filePaths);
} else {
throw new IllegalArgumentException("Could not find any data file written for commit, "
+ "so could not get schema for table " + metaClient.getBasePath());
Expand All @@ -129,6 +125,20 @@ private MessageType getTableParquetSchemaFromDataFile() {
}
}

private MessageType fetchSchemaFromFiles(Iterator<String> filePaths) throws IOException {
MessageType type = null;
while (filePaths.hasNext() && type == null) {
String filePath = filePaths.next();
if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
// this is a log file
type = readSchemaFromLogFile(new Path(filePath));
} else {
type = readSchemaFromBaseFile(filePath);
}
}
return type;
}

private MessageType readSchemaFromBaseFile(String filePath) throws IOException {
if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) {
// this is a parquet file
Expand Down

0 comments on commit 4258a71

Please sign in to comment.