Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run ruff format in CI #837

Merged
merged 3 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ jobs:
pip install ruff
# Update output format to enable automatic inline annotations.
- name: Run Ruff
run: ruff check --output-format=github python/
run: |
ruff check --output-format=github python/
ruff format --check python/

generate-license:
runs-on: ubuntu-latest
Expand Down
9 changes: 7 additions & 2 deletions python/datafusion/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1479,12 +1479,17 @@ def approx_percentile_cont(
"""Returns the value that is approximately at a given percentile of ``expr``."""
if num_centroids is None:
return Expr(
f.approx_percentile_cont(expression.expr, percentile.expr, distinct=distinct, num_centroids=None)
f.approx_percentile_cont(
expression.expr, percentile.expr, distinct=distinct, num_centroids=None
)
)

return Expr(
f.approx_percentile_cont(
expression.expr, percentile.expr, distinct=distinct, num_centroids=num_centroids.expr
expression.expr,
percentile.expr,
distinct=distinct,
num_centroids=num_centroids.expr,
)
)

Expand Down
105 changes: 62 additions & 43 deletions python/datafusion/tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,56 +39,74 @@ def df():
)
return ctx.create_dataframe([[batch]])


@pytest.fixture
def df_aggregate_100():
ctx = SessionContext()
ctx.register_csv("aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv")
return ctx.table("aggregate_test_data")


@pytest.mark.parametrize("agg_expr, calc_expected", [
(f.avg(column("a")), lambda a, b, c, d: np.array(np.average(a))),
(f.corr(column("a"), column("b")), lambda a, b, c, d: np.array(np.corrcoef(a, b)[0][1])),
(f.count(column("a")), lambda a, b, c, d: pa.array([len(a)])),
# Sample (co)variance -> ddof=1
# Population (co)variance -> ddof=0
(f.covar(column("a"), column("b")), lambda a, b, c, d: np.array(np.cov(a, b, ddof=1)[0][1])),
(f.covar_pop(column("a"), column("c")), lambda a, b, c, d: np.array(np.cov(a, c, ddof=0)[0][1])),
(f.covar_samp(column("b"), column("c")), lambda a, b, c, d: np.array(np.cov(b, c, ddof=1)[0][1])),
# f.grouping(col_a), # No physical plan implemented yet
(f.max(column("a")), lambda a, b, c, d: np.array(np.max(a))),
(f.mean(column("b")), lambda a, b, c, d: np.array(np.mean(b))),
(f.median(column("b")), lambda a, b, c, d: np.array(np.median(b))),
(f.min(column("a")), lambda a, b, c, d: np.array(np.min(a))),
(f.sum(column("b")), lambda a, b, c, d: np.array(np.sum(b.to_pylist()))),
# Sample stdev -> ddof=1
# Population stdev -> ddof=0
(f.stddev(column("a")), lambda a, b, c, d: np.array(np.std(a, ddof=1))),
(f.stddev_pop(column("b")), lambda a, b, c, d: np.array(np.std(b, ddof=0))),
(f.stddev_samp(column("c")), lambda a, b, c, d: np.array(np.std(c, ddof=1))),
(f.var(column("a")), lambda a, b, c, d: np.array(np.var(a, ddof=1))),
(f.var_pop(column("b")), lambda a, b, c, d: np.array(np.var(b, ddof=0))),
(f.var_samp(column("c")), lambda a, b, c, d: np.array(np.var(c, ddof=1))),
])
@pytest.mark.parametrize(
"agg_expr, calc_expected",
[
(f.avg(column("a")), lambda a, b, c, d: np.array(np.average(a))),
(
f.corr(column("a"), column("b")),
lambda a, b, c, d: np.array(np.corrcoef(a, b)[0][1]),
),
(f.count(column("a")), lambda a, b, c, d: pa.array([len(a)])),
# Sample (co)variance -> ddof=1
# Population (co)variance -> ddof=0
(
f.covar(column("a"), column("b")),
lambda a, b, c, d: np.array(np.cov(a, b, ddof=1)[0][1]),
),
(
f.covar_pop(column("a"), column("c")),
lambda a, b, c, d: np.array(np.cov(a, c, ddof=0)[0][1]),
),
(
f.covar_samp(column("b"), column("c")),
lambda a, b, c, d: np.array(np.cov(b, c, ddof=1)[0][1]),
),
# f.grouping(col_a), # No physical plan implemented yet
(f.max(column("a")), lambda a, b, c, d: np.array(np.max(a))),
(f.mean(column("b")), lambda a, b, c, d: np.array(np.mean(b))),
(f.median(column("b")), lambda a, b, c, d: np.array(np.median(b))),
(f.min(column("a")), lambda a, b, c, d: np.array(np.min(a))),
(f.sum(column("b")), lambda a, b, c, d: np.array(np.sum(b.to_pylist()))),
# Sample stdev -> ddof=1
# Population stdev -> ddof=0
(f.stddev(column("a")), lambda a, b, c, d: np.array(np.std(a, ddof=1))),
(f.stddev_pop(column("b")), lambda a, b, c, d: np.array(np.std(b, ddof=0))),
(f.stddev_samp(column("c")), lambda a, b, c, d: np.array(np.std(c, ddof=1))),
(f.var(column("a")), lambda a, b, c, d: np.array(np.var(a, ddof=1))),
(f.var_pop(column("b")), lambda a, b, c, d: np.array(np.var(b, ddof=0))),
(f.var_samp(column("c")), lambda a, b, c, d: np.array(np.var(c, ddof=1))),
],
)
def test_aggregation_stats(df, agg_expr, calc_expected):

agg_df = df.aggregate([], [agg_expr])
result = agg_df.collect()[0]
values_a, values_b, values_c, values_d = df.collect()[0]
expected = calc_expected(values_a, values_b, values_c, values_d)
np.testing.assert_array_almost_equal(result.column(0), expected)


@pytest.mark.parametrize("agg_expr, expected", [
(f.approx_distinct(column("b")), pa.array([2], type=pa.uint64())),
(f.approx_median(column("b")), pa.array([4])),
(f.approx_percentile_cont(column("b"), lit(0.5)), pa.array([4])),
(
f.approx_percentile_cont_with_weight(column("b"), lit(0.6), lit(0.5)),
pa.array([6], type=pa.float64())
),
(f.array_agg(column("b")), pa.array([[4, 4, 6]])),
])
@pytest.mark.parametrize(
"agg_expr, expected",
[
(f.approx_distinct(column("b")), pa.array([2], type=pa.uint64())),
(f.approx_median(column("b")), pa.array([4])),
(f.approx_percentile_cont(column("b"), lit(0.5)), pa.array([4])),
(
f.approx_percentile_cont_with_weight(column("b"), lit(0.6), lit(0.5)),
pa.array([6], type=pa.float64()),
),
(f.array_agg(column("b")), pa.array([[4, 4, 6]])),
],
)
def test_aggregation(df, agg_expr, expected):
agg_df = df.aggregate([], [agg_expr])
result = agg_df.collect()[0]
Expand All @@ -98,20 +116,21 @@ def test_aggregation(df, agg_expr, expected):
def test_aggregate_100(df_aggregate_100):
# https://github.com/apache/datafusion/blob/bddb6415a50746d2803dd908d19c3758952d74f9/datafusion/sqllogictest/test_files/aggregate.slt#L1490-L1498

result = df_aggregate_100.aggregate(
[
column("c1")
],
[
f.approx_percentile_cont(column("c3"), lit(0.95), lit(200)).alias("c3")
]
).sort(column("c1").sort(ascending=True)).collect()
result = (
df_aggregate_100.aggregate(
[column("c1")],
[f.approx_percentile_cont(column("c3"), lit(0.95), lit(200)).alias("c3")],
)
.sort(column("c1").sort(ascending=True))
.collect()
)

assert len(result) == 1
result = result[0]
assert result.column("c1") == pa.array(["a", "b", "c", "d", "e"])
assert result.column("c3") == pa.array([73, 68, 122, 124, 115])


def test_bit_add_or_xor(df):
df = df.aggregate(
[],
Expand Down
90 changes: 50 additions & 40 deletions python/datafusion/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,57 +279,67 @@ def test_distinct():


data_test_window_functions = [
("row", f.window("row_number", [], order_by=[f.order_by(column("c"))]), [2, 1, 3]),
("rank", f.window("rank", [], order_by=[f.order_by(column("c"))]), [2, 1, 2]),
("dense_rank", f.window("dense_rank", [], order_by=[f.order_by(column("c"))]), [2, 1, 2] ),
("percent_rank", f.window("percent_rank", [], order_by=[f.order_by(column("c"))]), [0.5, 0, 0.5]),
("cume_dist", f.window("cume_dist", [], order_by=[f.order_by(column("b"))]), [0.3333333333333333, 0.6666666666666666, 1.0]),
("ntile", f.window("ntile", [literal(2)], order_by=[f.order_by(column("c"))]), [1, 1, 2]),
("next", f.window("lead", [column("b")], order_by=[f.order_by(column("b"))]), [5, 6, None]),
("previous", f.window("lag", [column("b")], order_by=[f.order_by(column("b"))]), [None, 4, 5]),
pytest.param(
"first_value",
f.window(
("row", f.window("row_number", [], order_by=[f.order_by(column("c"))]), [2, 1, 3]),
("rank", f.window("rank", [], order_by=[f.order_by(column("c"))]), [2, 1, 2]),
(
"dense_rank",
f.window("dense_rank", [], order_by=[f.order_by(column("c"))]),
[2, 1, 2],
),
(
"percent_rank",
f.window("percent_rank", [], order_by=[f.order_by(column("c"))]),
[0.5, 0, 0.5],
),
(
"cume_dist",
f.window("cume_dist", [], order_by=[f.order_by(column("b"))]),
[0.3333333333333333, 0.6666666666666666, 1.0],
),
(
"ntile",
f.window("ntile", [literal(2)], order_by=[f.order_by(column("c"))]),
[1, 1, 2],
),
(
"next",
f.window("lead", [column("b")], order_by=[f.order_by(column("b"))]),
[5, 6, None],
),
(
"previous",
f.window("lag", [column("b")], order_by=[f.order_by(column("b"))]),
[None, 4, 5],
),
pytest.param(
"first_value",
[column("a")],
order_by=[f.order_by(column("b"))]
f.window("first_value", [column("a")], order_by=[f.order_by(column("b"))]),
[1, 1, 1],
),
pytest.param(
"last_value",
f.window("last_value", [column("b")], order_by=[f.order_by(column("b"))]),
[4, 5, 6],
),
[1, 1, 1],
),
pytest.param(
"last_value",
f.window("last_value", [column("b")], order_by=[f.order_by(column("b"))]),
[4, 5, 6],
),
pytest.param(
"2nd_value",
f.window(
"nth_value",
[column("b"), literal(2)],
order_by=[f.order_by(column("b"))],
pytest.param(
"2nd_value",
f.window(
"nth_value",
[column("b"), literal(2)],
order_by=[f.order_by(column("b"))],
),
[None, 5, 5],
),
[None, 5, 5],
),
]


@pytest.mark.parametrize("name,expr,result", data_test_window_functions)
def test_window_functions(df, name, expr, result):
df = df.select(
column("a"),
column("b"),
column("c"),
f.alias(expr, name)
)
df = df.select(column("a"), column("b"), column("c"), f.alias(expr, name))

table = pa.Table.from_batches(df.collect())

expected = {
"a": [1, 2, 3],
"b": [4, 5, 6],
"c": [8, 5, 8],
name: result
}
expected = {"a": [1, 2, 3], "b": [4, 5, 6], "c": [8, 5, 8], name: result}

assert table.sort_by("a").to_pydict() == expected

Expand Down
14 changes: 8 additions & 6 deletions python/datafusion/tests/test_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,26 @@ def test_expr_to_variant():
from datafusion import SessionContext
from datafusion.expr import Filter


def traverse_logical_plan(plan):
cur_node = plan.to_variant()
if isinstance(cur_node, Filter):
return cur_node.predicate().to_variant()
if hasattr(plan, 'inputs'):
if hasattr(plan, "inputs"):
for input_plan in plan.inputs():
res = traverse_logical_plan(input_plan)
if res is not None:
return res

ctx = SessionContext()
data = {'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie']}
ctx.from_pydict(data, name='table1')
data = {"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]}
ctx.from_pydict(data, name="table1")
query = "SELECT * FROM table1 t1 WHERE t1.name IN ('dfa', 'ad', 'dfre', 'vsa')"
logical_plan = ctx.sql(query).optimized_logical_plan()
variant = traverse_logical_plan(logical_plan)
assert variant is not None
assert variant.expr().to_variant().qualified_name() == 'table1.name'
assert str(variant.list()) == '[Expr(Utf8("dfa")), Expr(Utf8("ad")), Expr(Utf8("dfre")), Expr(Utf8("vsa"))]'
assert variant.expr().to_variant().qualified_name() == "table1.name"
assert (
str(variant.list())
== '[Expr(Utf8("dfa")), Expr(Utf8("ad")), Expr(Utf8("dfre")), Expr(Utf8("vsa"))]'
)
assert not variant.negated()
Loading
Loading