Skip to content

Commit

Permalink
Implement ScalarFunction MAKE_MAP and MAP (apache#11361)
Browse files Browse the repository at this point in the history
* tmp

* opt

* modify test

* add another version

* implement make_map function

* implement make_map function

* implement map function

* format and modify the doc

* add benchmark for map function

* add empty end-line

* fix cargo check

* update lock

* upate lock

* fix clippy

* fmt and clippy

* support FixedSizeList and LargeList

* check type and handle null array in coerce_types

* make array value throw todo error

* fix clippy

* simpify the error tests
  • Loading branch information
goldmedal authored and xinlifoobar committed Jul 17, 2024
1 parent 9a46ee8 commit be2cbba
Show file tree
Hide file tree
Showing 6 changed files with 545 additions and 1 deletion.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion datafusion/functions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ path = "src/lib.rs"

[dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true }
base64 = { version = "0.22", optional = true }
blake2 = { version = "^0.10.2", optional = true }
blake3 = { version = "1.0", optional = true }
Expand All @@ -86,7 +87,6 @@ uuid = { version = "1.7", features = ["v4"], optional = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
arrow-buffer = { workspace = true }
criterion = "0.5"
rand = { workspace = true }
rstest = { workspace = true }
Expand Down Expand Up @@ -141,3 +141,8 @@ required-features = ["string_expressions"]
harness = false
name = "upper"
required-features = ["string_expressions"]

[[bench]]
harness = false
name = "map"
required-features = ["core_expressions"]
101 changes: 101 additions & 0 deletions datafusion/functions/benches/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

extern crate criterion;

use arrow::array::{Int32Array, ListArray, StringArray};
use arrow::datatypes::{DataType, Field};
use arrow_buffer::{OffsetBuffer, ScalarBuffer};
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datafusion_functions::core::{make_map, map};
use rand::prelude::ThreadRng;
use rand::Rng;
use std::sync::Arc;

fn keys(rng: &mut ThreadRng) -> Vec<String> {
let mut keys = vec![];
for _ in 0..1000 {
keys.push(rng.gen_range(0..9999).to_string());
}
keys
}

fn values(rng: &mut ThreadRng) -> Vec<i32> {
let mut values = vec![];
for _ in 0..1000 {
values.push(rng.gen_range(0..9999));
}
values
}

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("make_map_1000", |b| {
let mut rng = rand::thread_rng();
let keys = keys(&mut rng);
let values = values(&mut rng);
let mut buffer = Vec::new();
for i in 0..1000 {
buffer.push(ColumnarValue::Scalar(ScalarValue::Utf8(Some(
keys[i].clone(),
))));
buffer.push(ColumnarValue::Scalar(ScalarValue::Int32(Some(values[i]))));
}

b.iter(|| {
black_box(
make_map()
.invoke(&buffer)
.expect("map should work on valid values"),
);
});
});

c.bench_function("map_1000", |b| {
let mut rng = rand::thread_rng();
let field = Arc::new(Field::new("item", DataType::Utf8, true));
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
let key_list = ListArray::new(
field,
offsets,
Arc::new(StringArray::from(keys(&mut rng))),
None,
);
let field = Arc::new(Field::new("item", DataType::Int32, true));
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 1000]));
let value_list = ListArray::new(
field,
offsets,
Arc::new(Int32Array::from(values(&mut rng))),
None,
);
let keys = ColumnarValue::Scalar(ScalarValue::List(Arc::new(key_list)));
let values = ColumnarValue::Scalar(ScalarValue::List(Arc::new(value_list)));

b.iter(|| {
black_box(
map()
.invoke(&[keys.clone(), values.clone()])
.expect("map should work on valid values"),
);
});
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
Loading

0 comments on commit be2cbba

Please sign in to comment.