From c72862b61399ff516e968fbd02885e573d4be81c Mon Sep 17 00:00:00 2001 From: Gabor Szadovszky Date: Wed, 12 May 2021 10:08:05 +0200 Subject: [PATCH] PARQUET-2037: Write INT96 with parquet-avro (#901) --- parquet-avro/README.md | 2 + .../parquet/avro/AvroSchemaConverter.java | 66 ++++++++++----- .../apache/parquet/avro/AvroWriteSupport.java | 3 + .../parquet/avro/TestAvroSchemaConverter.java | 32 ++++++++ .../src/test/resources/fixedToInt96.avsc | 80 +++++++++++++++++++ 5 files changed, 161 insertions(+), 22 deletions(-) create mode 100644 parquet-avro/src/test/resources/fixedToInt96.avsc diff --git a/parquet-avro/README.md b/parquet-avro/README.md index 8b1cca2550..4f67491584 100644 --- a/parquet-avro/README.md +++ b/parquet-avro/README.md @@ -32,6 +32,7 @@ Apache Avro integration | `parquet.avro.read.schema` | `String` | The Avro schema to be used for reading. It shall be compatible with the file schema. The file schema will be used directly if not set. | | `parquet.avro.projection` | `String` | The Avro schema to be used for projection. | | `parquet.avro.compatible` | `boolean` | Flag for compatibility mode. `true` for materializing Avro `IndexedRecord` objects, `false` for materializing the related objects for either generic, specific, or reflect records.
The default value is `true`. | +| `parquet.avro.readInt96AsFixed` | `boolean` | Flag for handling the `INT96` Parquet types. `true` for converting it to the `fixed` Avro type, `false` for not handling `INT96` types (throwing exception).
The default value is `false`.
**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** | ### Configuration for writing @@ -42,3 +43,4 @@ Apache Avro integration | `parquet.avro.write-old-list-structure` | `boolean` | Flag whether to write list structures in the old way (2 levels) or the new one (3 levels). When writing at 2 levels no null values are available at the element level.
The default value is `true` | | `parquet.avro.add-list-element-records` | `boolean` | Flag whether to assume that any repeated element in the schema is a list element.
The default value is `true`. | | `parquet.avro.write-parquet-uuid` | `boolean` | Flag whether to write the [Parquet UUID logical type](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid) in case of an [Avro UUID type](https://avro.apache.org/docs/current/spec.html#UUID) is present.
The default value is `false`. | +| `parquet.avro.writeFixedAsInt96` | `String` | Comma separated list of paths pointing to Avro schema elements which are to be converted to `INT96` Parquet types.
The path is a `'.'` separated list of field names and does not contain the name of the schema nor the namespace. The type of the referenced schema elements must be `fixed` with the size of 12 bytes.
**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** | diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 4c06e9c9b2..7d1f3cab9f 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -35,16 +35,20 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import static java.util.Optional.empty; import static java.util.Optional.of; import static org.apache.avro.JsonProperties.NULL_VALUE; import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED; import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT; +import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID; @@ -77,6 +81,7 @@ public class AvroSchemaConverter { private final boolean writeOldListStructure; private final boolean writeParquetUUID; private final boolean readInt96AsFixed; + private final Set pathsToInt96; public AvroSchemaConverter() { this(ADD_LIST_ELEMENT_RECORDS_DEFAULT); @@ -93,6 +98,7 @@ public AvroSchemaConverter() { this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT; this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT; this.readInt96AsFixed = READ_INT96_AS_FIXED_DEFAULT; + this.pathsToInt96 = Collections.emptySet(); } public AvroSchemaConverter(Configuration conf) { @@ -102,6 +108,7 @@ public AvroSchemaConverter(Configuration conf) { WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT); this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT); this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT); + this.pathsToInt96 = new HashSet<>(Arrays.asList(conf.getStrings(WRITE_FIXED_AS_INT96, new String[0]))); } /** @@ -134,26 +141,26 @@ public MessageType convert(Schema avroSchema) { if (!avroSchema.getType().equals(Schema.Type.RECORD)) { throw new IllegalArgumentException("Avro schema must be a record."); } - return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields())); + return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields(), "")); } - private List convertFields(List fields) { + private List convertFields(List fields, String schemaPath) { List types = new ArrayList(); for (Schema.Field field : fields) { if (field.schema().getType().equals(Schema.Type.NULL)) { continue; // Avro nulls are not encoded, unless they are null unions } - types.add(convertField(field)); + types.add(convertField(field, appendPath(schemaPath, field.name()))); } return types; } - private Type convertField(String fieldName, Schema schema) { - return convertField(fieldName, schema, Type.Repetition.REQUIRED); + private Type convertField(String fieldName, Schema schema, String schemaPath) { + return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath); } @SuppressWarnings("deprecation") - private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) { + private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) { Types.PrimitiveBuilder builder; Schema.Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); @@ -177,26 +184,33 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet builder = Types.primitive(BINARY, repetition).as(stringType()); } } else if (type.equals(Schema.Type.RECORD)) { - return new GroupType(repetition, fieldName, convertFields(schema.getFields())); + return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath)); } else if (type.equals(Schema.Type.ENUM)) { builder = Types.primitive(BINARY, repetition).as(enumType()); } else if (type.equals(Schema.Type.ARRAY)) { if (writeOldListStructure) { return ConversionPatterns.listType(repetition, fieldName, - convertField("array", schema.getElementType(), REPEATED)); + convertField("array", schema.getElementType(), REPEATED, schemaPath)); } else { return ConversionPatterns.listOfElements(repetition, fieldName, - convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType())); + convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), schemaPath)); } } else if (type.equals(Schema.Type.MAP)) { - Type valType = convertField("value", schema.getValueType()); + Type valType = convertField("value", schema.getValueType(), schemaPath); // avro map key type is always string return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType); } else if (type.equals(Schema.Type.FIXED)) { - builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .length(schema.getFixedSize()); + if (pathsToInt96.contains(schemaPath)) { + if (schema.getFixedSize() != 12) { + throw new IllegalArgumentException( + "The size of the fixed type field " + schemaPath + " must be 12 bytes for INT96 conversion"); + } + builder = Types.primitive(PrimitiveTypeName.INT96, repetition); + } else { + builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(schema.getFixedSize()); + } } else if (type.equals(Schema.Type.UNION)) { - return convertUnion(fieldName, schema, repetition); + return convertUnion(fieldName, schema, repetition, schemaPath); } else { throw new UnsupportedOperationException("Cannot convert Avro type " + type); } @@ -218,7 +232,7 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet return builder.named(fieldName); } - private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) { + private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) { List nonNullSchemas = new ArrayList(schema.getTypes().size()); // Found any schemas in the union? Required for the edge case, where the union contains only a single type. boolean foundNullSchema = false; @@ -239,25 +253,26 @@ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repet throw new UnsupportedOperationException("Cannot convert Avro union of only nulls"); case 1: - return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition) : - convertUnionToGroupType(fieldName, repetition, nonNullSchemas); + return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition, schemaPath) : + convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath); default: // complex union type - return convertUnionToGroupType(fieldName, repetition, nonNullSchemas); + return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath); } } - private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List nonNullSchemas) { + private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List nonNullSchemas, + String schemaPath) { List unionTypes = new ArrayList(nonNullSchemas.size()); int index = 0; for (Schema childSchema : nonNullSchemas) { - unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL)); + unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL, schemaPath)); } return new GroupType(repetition, fieldName, unionTypes); } - private Type convertField(Schema.Field field) { - return convertField(field.name(), field.schema()); + private Type convertField(Schema.Field field, String schemaPath) { + return convertField(field.name(), field.schema(), schemaPath); } public Schema convert(MessageType parquetSchema) { @@ -314,7 +329,7 @@ public Schema convertINT96(PrimitiveTypeName primitiveTypeName) { return Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12); } throw new IllegalArgumentException( - "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array."); + "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array."); } @Override public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) { @@ -524,4 +539,11 @@ private static Schema optional(Schema original) { Schema.create(Schema.Type.NULL), original)); } + + private static String appendPath(String path, String fieldName) { + if (path == null || path.isEmpty()) { + return fieldName; + } + return path + '.' + fieldName; + } } diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index 440658773b..82a80d31d3 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -67,6 +67,9 @@ public static void setAvroDataSupplier( public static final String WRITE_PARQUET_UUID = "parquet.avro.write-parquet-uuid"; static final boolean WRITE_PARQUET_UUID_DEFAULT = false; + // Support writing Parquet INT96 from a 12-byte Avro fixed. + public static final String WRITE_FIXED_AS_INT96 = "parquet.avro.writeFixedAsInt96"; + private static final String MAP_REPEATED_NAME = "key_value"; private static final String MAP_KEY_NAME = "key"; private static final String MAP_VALUE_NAME = "value"; diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index 065a63694a..1bafdec1e3 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -44,6 +44,7 @@ import static org.apache.parquet.avro.AvroTestUtil.optionalField; import static org.apache.parquet.avro.AvroTestUtil.primitive; import static org.apache.parquet.avro.AvroTestUtil.record; +import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96; import static org.apache.parquet.schema.OriginalType.DATE; import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS; import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS; @@ -824,6 +825,37 @@ public void testUUIDTypeWithParquetUUID() throws Exception { "}\n"); } + @Test + public void testAvroFixed12AsParquetInt96Type() throws Exception { + Schema schema = new Schema.Parser().parse( + Resources.getResource("fixedToInt96.avsc").openStream()); + + Configuration conf = new Configuration(); + conf.setStrings(WRITE_FIXED_AS_INT96, "int96", "mynestedrecord.int96inrecord", "mynestedrecord.myarrayofoptional", + "mynestedrecord.mymap"); + testAvroToParquetConversion(conf, schema, "message org.apache.parquet.avro.fixedToInt96 {\n" + + " required int96 int96;\n" + + " required fixed_len_byte_array(12) notanint96;\n" + + " required group mynestedrecord {\n" + + " required int96 int96inrecord;\n" + + " required group myarrayofoptional (LIST) {\n" + + " repeated int96 array;\n" + + " }\n" + + " required group mymap (MAP) {\n" + + " repeated group key_value (MAP_KEY_VALUE) {\n" + + " required binary key (STRING);\n" + + " required int96 value;\n" + + " }\n" + + " }\n" + + " }\n" + + " required fixed_len_byte_array(1) onebytefixed;\n" + + "}"); + + conf.setStrings(WRITE_FIXED_AS_INT96, "onebytefixed"); + assertThrows("Exception should be thrown for fixed types to be converted to INT96 where the size is not 12 bytes", + IllegalArgumentException.class, () -> new AvroSchemaConverter(conf).convert(schema)); + } + public static Schema optional(Schema original) { return Schema.createUnion(Lists.newArrayList( Schema.create(Schema.Type.NULL), diff --git a/parquet-avro/src/test/resources/fixedToInt96.avsc b/parquet-avro/src/test/resources/fixedToInt96.avsc new file mode 100644 index 0000000000..97028521c8 --- /dev/null +++ b/parquet-avro/src/test/resources/fixedToInt96.avsc @@ -0,0 +1,80 @@ +{ + "name": "fixedToInt96", + "namespace": "org.apache.parquet.avro", + "type": "record", + "fields": [ + { + "name": "int96", + "type": { + "type": "fixed", + "name": "ignored1", + "namespace": "", + "size": 12 + } + }, + { + "name": "notanint96", + "type": { + "type": "fixed", + "name": "ignored2", + "namespace": "", + "size": 12 + } + }, + { + "name": "mynestedrecord", + "type": { + "type": "record", + "name": "ignored3", + "namespace": "", + "fields": [ + { + "name": "int96inrecord", + "type": { + "type": "fixed", + "name": "ignored4", + "namespace": "", + "size": 12 + } + }, + { + "name": "myarrayofoptional", + "type": { + "type": "array", + "items": [ + "null", + { + "type": "fixed", + "name": "ignored5", + "namespace": "", + "size": 12 + } + ] + } + }, + { + "name": "mymap", + "type": { + "type": "map", + "values": { + "type": "fixed", + "name": "ignored6", + "namespace": "", + "size": 12 + } + } + } + ] + } + }, + { + "name": "onebytefixed", + "type": { + "type": "fixed", + "name": "ignored7", + "namespace": "", + "size": 1 + } + } + ] +}