Skip to content

Commit

Permalink
Make AvroArrowArrayReader possible to scan Avro backed table which co…
Browse files Browse the repository at this point in the history
…ntains nested records (#7525)

* Fix for nested Avro records

* Add test for nested records
  • Loading branch information
sarutak authored Sep 14, 2023
1 parent f76a8cd commit 58ddcee
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 12 deletions.
53 changes: 44 additions & 9 deletions datafusion/core/src/datasource/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,47 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
pub fn schema_lookup(schema: AvroSchema) -> Result<BTreeMap<String, usize>> {
match schema {
AvroSchema::Record(RecordSchema {
lookup: ref schema_lookup,
..
}) => Ok(schema_lookup.clone()),
fields, mut lookup, ..
}) => {
for field in fields {
Self::child_schema_lookup(&field.schema, &mut lookup)?;
}
Ok(lookup)
}
_ => Err(DataFusionError::ArrowError(SchemaError(
"expected avro schema to be a record".to_string(),
))),
}
}

fn child_schema_lookup<'b>(
schema: &AvroSchema,
schema_lookup: &'b mut BTreeMap<String, usize>,
) -> Result<&'b BTreeMap<String, usize>> {
match schema {
AvroSchema::Record(RecordSchema {
name,
fields,
lookup,
..
}) => {
lookup.iter().for_each(|(field_name, pos)| {
schema_lookup
.insert(format!("{}.{}", name.fullname(None), field_name), *pos);
});

for field in fields {
Self::child_schema_lookup(&field.schema, schema_lookup)?;
}
}
AvroSchema::Array(schema) => {
Self::child_schema_lookup(schema, schema_lookup)?;
}
_ => (),
}
Ok(schema_lookup)
}

/// Read the next batch of records
pub fn next_batch(&mut self, batch_size: usize) -> Option<ArrowResult<RecordBatch>> {
let rows_result = self
Expand Down Expand Up @@ -519,25 +551,28 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
let num_bytes = bit_util::ceil(array_item_count, 8);
let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
let mut struct_index = 0;
let rows: Vec<Vec<(String, Value)>> = rows
let null_struct_array = vec![("null".to_string(), Value::Null)];
let rows: Vec<&Vec<(String, Value)>> = rows
.iter()
.map(|row| {
.flat_map(|row| {
if let Value::Array(values) = row {
values.iter().for_each(|_| {
bit_util::set_bit(&mut null_buffer, struct_index);
struct_index += 1;
});
values
.iter()
.map(|v| ("".to_string(), v.clone()))
.collect::<Vec<(String, Value)>>()
.map(|v| match v {
Value::Record(record) => record,
other => panic!("expected Record, got {other:?}"),
})
.collect::<Vec<&Vec<(String, Value)>>>()
} else {
struct_index += 1;
vec![("null".to_string(), Value::Null)]
vec![&null_struct_array]
}
})
.collect();
let rows = rows.iter().collect::<Vec<&Vec<(String, Value)>>>();
let arrays = self.build_struct_array(&rows, fields, &[])?;
let data_type = DataType::Struct(fields.clone());
ArrayDataBuilder::new(data_type)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ postgres-protocol = {version = "0.6.4", optional = true}

[features]
postgres = ["bytes", "chrono", "tokio-postgres", "postgres-types", "postgres-protocol"]
avro = ["datafusion/avro"]

[dev-dependencies]
env_logger = "0.10"
num_cpus = "1.13.0"


[[test]]
harness = false
name = "sqllogictests"
Expand Down
13 changes: 13 additions & 0 deletions datafusion/sqllogictest/test_files/avro.slt
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ STORED AS AVRO
WITH HEADER ROW
LOCATION '../../testing/data/avro/single_nan.avro'

statement ok
CREATE EXTERNAL TABLE nested_records
STORED AS AVRO
WITH HEADER ROW
LOCATION '../../testing/data/avro/nested_records.avro'

# test avro query
query IT
SELECT id, CAST(string_col AS varchar) FROM alltypes_plain
Expand Down Expand Up @@ -82,6 +88,13 @@ SELECT id, CAST(string_col AS varchar) FROM alltypes_plain_multi_files
0 0
1 1

# test avro nested records
query ??
SELECT f1, f2 FROM nested_records
----
{ns2.record2.f1_1: aaa, ns2.record2.f1_2: 10, ns2.record2.f1_3: {ns3.record3.f1_3_1: 3.14}} [{ns4.record4.f2_1: true, ns4.record4.f2_2: 1.2}, {ns4.record4.f2_1: true, ns4.record4.f2_2: 2.2}]
{ns2.record2.f1_1: bbb, ns2.record2.f1_2: 20, ns2.record2.f1_3: {ns3.record3.f1_3_1: 3.14}} [{ns4.record4.f2_1: false, ns4.record4.f2_2: 10.2}]

# test avro explain
query TT
EXPLAIN SELECT count(*) from alltypes_plain
Expand Down
2 changes: 1 addition & 1 deletion testing

0 comments on commit 58ddcee

Please sign in to comment.