Skip to content

Commit

Permalink
ORC-236: Support UNION type in Java Convert tool (#1025)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This patch add support to convert json to UNION type in orc file. For
example, for schema `struct<foo:uniontype<int,string>>`, the following
json lines can be parsed into UNION type.

```
{"foo": {"tag": 0, "value": 1}}
{"foo": {"tag": 1, "value": "testing"}}
{"foo": {"tag": 0, "value": 3}}
```

### Why are the changes needed?

This add a missing support for UNION type in java convert tool.

### How was this patch tested?

Manually test against handcrafted json file.
  • Loading branch information
rizaon authored Feb 16, 2022
1 parent 0d6ad91 commit 6f44815
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class ConvertTool {
private final String csvNullString;
private final String timestampFormat;
private final String bloomFilterColumns;
private final String unionTag;
private final String unionValue;
private final Writer writer;
private final VectorizedRowBatch batch;

Expand Down Expand Up @@ -151,7 +153,8 @@ public RecordReader getRecordReader() throws IOException {
}
case JSON: {
FSDataInputStream underlying = filesystem.open(path);
return new JsonReader(getReader(underlying), underlying, size, schema, timestampFormat);
return new JsonReader(getReader(underlying), underlying, size, schema, timestampFormat,
unionTag, unionValue);
}
case CSV: {
FSDataInputStream underlying = filesystem.open(path);
Expand Down Expand Up @@ -196,6 +199,8 @@ public ConvertTool(Configuration conf,
this.csvNullString = opts.getOptionValue('n', "");
this.timestampFormat = opts.getOptionValue("t", DEFAULT_TIMESTAMP_FORMAT);
this.bloomFilterColumns = opts.getOptionValue('b', null);
this.unionTag = opts.getOptionValue("union-tag", "tag");
this.unionValue = opts.getOptionValue("union-value", "value");
String outFilename = opts.hasOption('o')
? opts.getOptionValue('o') : "output.orc";
boolean overwrite = opts.hasOption('O');
Expand Down Expand Up @@ -274,6 +279,14 @@ private static CommandLine parseOptions(String[] args) throws ParseException {
Option.builder("O").longOpt("overwrite").desc("Overwrite an existing file")
.build()
);
options.addOption(
Option.builder().longOpt("union-tag")
.desc("JSON key name representing UNION tag. Default to \"tag\".")
.hasArg().build());
options.addOption(
Option.builder().longOpt("union-value")
.desc("JSON key name representing UNION value. Default to \"value\".")
.hasArg().build());
CommandLine cli = new DefaultParser().parse(options, args);
if (cli.hasOption('h') || cli.getArgs().length == 0) {
HelpFormatter formatter = new HelpFormatter();
Expand Down
43 changes: 43 additions & 0 deletions java/tools/src/java/org/apache/orc/tools/convert/JsonReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
Expand Down Expand Up @@ -62,6 +63,8 @@ public class JsonReader implements RecordReader {
private final FSDataInputStream input;
private long rowNumber = 0;
private final DateTimeFormatter dateTimeFormatter;
private String unionTag = "tag";
private String unionValue = "value";

interface JsonConverter {
void convert(JsonElement value, ColumnVector vect, int row);
Expand Down Expand Up @@ -293,6 +296,32 @@ public void convert(JsonElement value, ColumnVector vect, int row) {
}
}

class UnionColumnConverter implements JsonConverter {
private JsonConverter[] childConverter;

UnionColumnConverter(TypeDescription schema) {
int size = schema.getChildren().size();
childConverter = new JsonConverter[size];
for (int i = 0; i < size; i++) {
childConverter[i] = createConverter(schema.getChildren().get(i));
}
}

@Override
public void convert(JsonElement value, ColumnVector vect, int row) {
if (value == null || value.isJsonNull()) {
vect.noNulls = false;
vect.isNull[row] = true;
} else {
UnionColumnVector vector = (UnionColumnVector) vect;
JsonObject obj = value.getAsJsonObject();
int tag = obj.get(unionTag).getAsInt();
vector.tags[row] = tag;
childConverter[tag].convert(obj.get(unionValue), vector.fields[tag], row);
}
}
}

JsonConverter createConverter(TypeDescription schema) {
switch (schema.getCategory()) {
case BYTE:
Expand Down Expand Up @@ -324,11 +353,25 @@ JsonConverter createConverter(TypeDescription schema) {
return new ListColumnConverter(schema);
case MAP:
return new MapColumnConverter(schema);
case UNION:
return new UnionColumnConverter(schema);
default:
throw new IllegalArgumentException("Unhandled type " + schema);
}
}

public JsonReader(Reader reader,
FSDataInputStream underlying,
long size,
TypeDescription schema,
String timestampFormat,
String unionTag,
String unionValue) throws IOException {
this(new JsonStreamParser(reader), underlying, size, schema, timestampFormat);
this.unionTag = unionTag;
this.unionValue = unionValue;
}

public JsonReader(Reader reader,
FSDataInputStream underlying,
long size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.orc.tools.convert;

import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.TypeDescription;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -143,4 +146,32 @@ public void testDateTimeTypeSupport() throws IOException {
assertEquals(datetime6.toInstant(), cv.asScratchTimestamp(5).toInstant());
}

@Test
public void testUnionTypeSupport() throws IOException {
String inputString = "{\"foo\": {\"tag\": 0, \"value\": 1}}\n" +
"{\"foo\": {\"tag\": 1, \"value\": \"testing\"}}\n" +
"{\"foo\": {\"tag\": 0, \"value\": 3}}";


StringReader input = new StringReader(inputString);

TypeDescription schema = TypeDescription.fromString("struct<foo:uniontype<int,string>>");
JsonReader reader = new JsonReader(input, null, 1, schema, "", "tag", "value");
VectorizedRowBatch batch = schema.createRowBatch(3);
assertTrue(reader.nextBatch(batch));
assertEquals(3, batch.size);
UnionColumnVector union = (UnionColumnVector) batch.cols[0];
LongColumnVector longs = (LongColumnVector) union.fields[0];
BytesColumnVector strs = (BytesColumnVector) union.fields[1];
assertTrue(union.noNulls);
assertFalse(union.isNull[0]);
assertEquals(0, union.tags[0]);
assertEquals(1, longs.vector[0]);
assertFalse(union.isNull[1]);
assertEquals(1, union.tags[1]);
assertEquals("testing", strs.toString(1));
assertFalse(union.isNull[2]);
assertEquals(0, union.tags[2]);
assertEquals(3, longs.vector[2]);
}
}

0 comments on commit 6f44815

Please sign in to comment.