Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-236: Support UNION type in Java Convert tool #1025

Merged
merged 4 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class ConvertTool {
private final String csvNullString;
private final String timestampFormat;
private final String bloomFilterColumns;
private final String unionTag;
private final String unionValue;
private final Writer writer;
private final VectorizedRowBatch batch;

Expand Down Expand Up @@ -151,7 +153,8 @@ public RecordReader getRecordReader() throws IOException {
}
case JSON: {
FSDataInputStream underlying = filesystem.open(path);
return new JsonReader(getReader(underlying), underlying, size, schema, timestampFormat);
return new JsonReader(getReader(underlying), underlying, size, schema, timestampFormat,
unionTag, unionValue);
}
case CSV: {
FSDataInputStream underlying = filesystem.open(path);
Expand Down Expand Up @@ -196,6 +199,8 @@ public ConvertTool(Configuration conf,
this.csvNullString = opts.getOptionValue('n', "");
this.timestampFormat = opts.getOptionValue("t", DEFAULT_TIMESTAMP_FORMAT);
this.bloomFilterColumns = opts.getOptionValue('b', null);
this.unionTag = opts.getOptionValue("union-tag", "tag");
this.unionValue = opts.getOptionValue("union-value", "value");
String outFilename = opts.hasOption('o')
? opts.getOptionValue('o') : "output.orc";
boolean overwrite = opts.hasOption('O');
Expand Down Expand Up @@ -274,6 +279,14 @@ private static CommandLine parseOptions(String[] args) throws ParseException {
Option.builder("O").longOpt("overwrite").desc("Overwrite an existing file")
.build()
);
options.addOption(
Option.builder().longOpt("union-tag")
.desc("JSON key name representing UNION tag. Default to \"tag\".")
.hasArg().build());
options.addOption(
Option.builder().longOpt("union-value")
.desc("JSON key name representing UNION value. Default to \"value\".")
.hasArg().build());
CommandLine cli = new DefaultParser().parse(options, args);
if (cli.hasOption('h') || cli.getArgs().length == 0) {
HelpFormatter formatter = new HelpFormatter();
Expand Down
43 changes: 43 additions & 0 deletions java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class JsonReader implements RecordReader {
private final FSDataInputStream input;
private long rowNumber = 0;
private final DateTimeFormatter dateTimeFormatter;
private String unionTag = "tag";
private String unionValue = "value";

interface JsonConverter {
void convert(JsonElement value, ColumnVector vect, int row);
Expand Down Expand Up @@ -293,6 +296,32 @@ public void convert(JsonElement value, ColumnVector vect, int row) {
}
}

class UnionColumnConverter implements JsonConverter {
private JsonConverter[] childConverter;

UnionColumnConverter(TypeDescription schema) {
int size = schema.getChildren().size();
childConverter = new JsonConverter[size];
for (int i = 0; i < size; i++) {
childConverter[i] = createConverter(schema.getChildren().get(i));
}
}

@Override
public void convert(JsonElement value, ColumnVector vect, int row) {
if (value == null || value.isJsonNull()) {
vect.noNulls = false;
vect.isNull[row] = true;
} else {
UnionColumnVector vector = (UnionColumnVector) vect;
JsonObject obj = value.getAsJsonObject();
int tag = obj.get(unionTag).getAsInt();
vector.tags[row] = tag;
childConverter[tag].convert(obj.get(unionValue), vector.fields[tag], row);
}
}
}

JsonConverter createConverter(TypeDescription schema) {
switch (schema.getCategory()) {
case BYTE:
Expand Down Expand Up @@ -324,11 +353,25 @@ JsonConverter createConverter(TypeDescription schema) {
return new ListColumnConverter(schema);
case MAP:
return new MapColumnConverter(schema);
case UNION:
return new UnionColumnConverter(schema);
default:
throw new IllegalArgumentException("Unhandled type " + schema);
}
}

public JsonReader(Reader reader,
FSDataInputStream underlying,
long size,
TypeDescription schema,
String timestampFormat,
String unionTag,
String unionValue) throws IOException {
this(new JsonStreamParser(reader), underlying, size, schema, timestampFormat);
this.unionTag = unionTag;
this.unionValue = unionValue;
}

public JsonReader(Reader reader,
FSDataInputStream underlying,
long size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.orc.tools.convert;

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -143,4 +146,32 @@ public void testDateTimeTypeSupport() throws IOException {
assertEquals(datetime6.toInstant(), cv.asScratchTimestamp(5).toInstant());
}

@Test
public void testUnionTypeSupport() throws IOException {
String inputString = "{\"foo\": {\"tag\": 0, \"value\": 1}}\n" +
"{\"foo\": {\"tag\": 1, \"value\": \"testing\"}}\n" +
"{\"foo\": {\"tag\": 0, \"value\": 3}}";


StringReader input = new StringReader(inputString);

TypeDescription schema = TypeDescription.fromString("struct<foo:uniontype<int,string>>");
JsonReader reader = new JsonReader(input, null, 1, schema, "", "tag", "value");
VectorizedRowBatch batch = schema.createRowBatch(3);
assertTrue(reader.nextBatch(batch));
assertEquals(3, batch.size);
UnionColumnVector union = (UnionColumnVector) batch.cols[0];
LongColumnVector longs = (LongColumnVector) union.fields[0];
BytesColumnVector strs = (BytesColumnVector) union.fields[1];
assertTrue(union.noNulls);
assertFalse(union.isNull[0]);
assertEquals(0, union.tags[0]);
assertEquals(1, longs.vector[0]);
assertFalse(union.isNull[1]);
assertEquals(1, union.tags[1]);
assertEquals("testing", strs.toString(1));
assertFalse(union.isNull[2]);
assertEquals(0, union.tags[2]);
assertEquals(3, longs.vector[2]);
}
}