Skip to content

Commit

Permalink
fix ci
Browse files Browse the repository at this point in the history
  • Loading branch information
xiarixiaoyao committed Oct 1, 2022
1 parent 0445bf7 commit ca9b8fb
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf,
hiveColumnTypes = rawHiveColumnTypes;
}

JobConf newConf = new JobConf(jobConf);
setPropsForInputFormat(inputFormat, newConf, schema, hiveColumnTypes, projectCols, projectedColumns, populateMetaFields);
setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedColumns, populateMetaFields);
final List<Field> fields;
if (projectCols) {
fields = schema.getFields().stream().filter(f -> projectedColumns.contains(f.name()))
Expand All @@ -148,11 +147,11 @@ public static List<GenericRecord> getRecordsUsingInputFormat(Configuration conf,

List<GenericRecord> records = new ArrayList<>();
try {
FileInputFormat.setInputPaths(newConf, String.join(",", inputPaths));
InputSplit[] splits = inputFormat.getSplits(newConf, inputPaths.size());
FileInputFormat.setInputPaths(jobConf, String.join(",", inputPaths));
InputSplit[] splits = inputFormat.getSplits(jobConf, inputPaths.size());

for (InputSplit split : splits) {
RecordReader recordReader = inputFormat.getRecordReader(split, newConf, null);
RecordReader recordReader = inputFormat.getRecordReader(split, jobConf, null);
Object key = recordReader.createKey();
ArrayWritable writable = (ArrayWritable) recordReader.createValue();
while (recordReader.next(key, writable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@
*/
public class HiveAvroSerializer {

private static final Logger LOG = LogManager.getLogger(HiveAvroSerializer.class);

private final List<String> columnNames;
private final List<TypeInfo> columnTypes;
private final ObjectInspector objectInspector;

private static final Logger LOG = LogManager.getLogger(HiveAvroSerializer.class);

public HiveAvroSerializer(ObjectInspector objectInspector, List<String> columnNames, List<TypeInfo> columnTypes) {
this.columnNames = columnNames;
this.columnTypes = columnTypes;
Expand Down Expand Up @@ -104,20 +104,13 @@ public GenericRecord serialize(Object o, Schema schema) {
break;
}
try {
// use to digest ci bug, should remove those code , before merged.
setUpRecordFieldFromWritable(columnTypes.get(i), structFieldsDataAsList.get(i),
allStructFieldRefs.get(i).getFieldObjectInspector(), record, field);
} catch (Exception e) {
// print log for debug
LOG.error(String.format("failed to convert index: %s", i));
LOG.error(String.format("current names: %s", columnNames.stream().collect(Collectors.joining(","))));
LOG.error(String.format("current type %s",
columnTypes.stream().map(f -> f.getTypeName()).collect(Collectors.joining(","))));
LOG.error(String.format("current avro schema: %s", schema.toString()));
LOG.error(String.format("current columnNames: %s", columnNames.stream().collect(Collectors.joining(","))));
LOG.error(String.format("current type: %s", columnTypes.stream().map(f -> f.getTypeName()).collect(Collectors.joining(","))));
LOG.error(String.format("current value: %s", HoodieRealtimeRecordReaderUtils.arrayWritableToString((ArrayWritable) o)));
throw e;
}

}
return record;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@
public class TestBootstrap extends HoodieClientTestBase {

public static final String TRIP_HIVE_COLUMN_TYPES = "bigint,string,string,string,string,double,double,double,double,"
+ "struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
+ "struct<amount:double,currency:string>,array<struct<element:struct<amount:double,currency:string>>>,boolean";

@TempDir
public java.nio.file.Path tmpFolder;
Expand Down

0 comments on commit ca9b8fb

Please sign in to comment.