Skip to content

Commit

Permalink
Run ruff format in CI (#837)
Browse files Browse the repository at this point in the history
* Run ruff format in CI

* Add --check parameter

* Apply ruff format
  • Loading branch information
timsaucer authored Aug 27, 2024
1 parent 22c70ef commit 766e2ed
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 153 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ jobs:
pip install ruff
# Update output format to enable automatic inline annotations.
- name: Run Ruff
run: ruff check --output-format=github python/
run: |
ruff check --output-format=github python/
ruff format --check python/
generate-license:
runs-on: ubuntu-latest
Expand Down
9 changes: 7 additions & 2 deletions python/datafusion/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1479,12 +1479,17 @@ def approx_percentile_cont(
"""Returns the value that is approximately at a given percentile of ``expr``."""
if num_centroids is None:
return Expr(
f.approx_percentile_cont(expression.expr, percentile.expr, distinct=distinct, num_centroids=None)
f.approx_percentile_cont(
expression.expr, percentile.expr, distinct=distinct, num_centroids=None
)
)

return Expr(
f.approx_percentile_cont(
expression.expr, percentile.expr, distinct=distinct, num_centroids=num_centroids.expr
expression.expr,
percentile.expr,
distinct=distinct,
num_centroids=num_centroids.expr,
)
)

Expand Down
105 changes: 62 additions & 43 deletions python/datafusion/tests/test_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,56 +39,74 @@ def df():
)
return ctx.create_dataframe([[batch]])


@pytest.fixture
def df_aggregate_100():
ctx = SessionContext()
ctx.register_csv("aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv")
return ctx.table("aggregate_test_data")


@pytest.mark.parametrize("agg_expr, calc_expected", [
(f.avg(column("a")), lambda a, b, c, d: np.array(np.average(a))),
(f.corr(column("a"), column("b")), lambda a, b, c, d: np.array(np.corrcoef(a, b)[0][1])),
(f.count(column("a")), lambda a, b, c, d: pa.array([len(a)])),
# Sample (co)variance -> ddof=1
# Population (co)variance -> ddof=0
(f.covar(column("a"), column("b")), lambda a, b, c, d: np.array(np.cov(a, b, ddof=1)[0][1])),
(f.covar_pop(column("a"), column("c")), lambda a, b, c, d: np.array(np.cov(a, c, ddof=0)[0][1])),
(f.covar_samp(column("b"), column("c")), lambda a, b, c, d: np.array(np.cov(b, c, ddof=1)[0][1])),
# f.grouping(col_a), # No physical plan implemented yet
(f.max(column("a")), lambda a, b, c, d: np.array(np.max(a))),
(f.mean(column("b")), lambda a, b, c, d: np.array(np.mean(b))),
(f.median(column("b")), lambda a, b, c, d: np.array(np.median(b))),
(f.min(column("a")), lambda a, b, c, d: np.array(np.min(a))),
(f.sum(column("b")), lambda a, b, c, d: np.array(np.sum(b.to_pylist()))),
# Sample stdev -> ddof=1
# Population stdev -> ddof=0
(f.stddev(column("a")), lambda a, b, c, d: np.array(np.std(a, ddof=1))),
(f.stddev_pop(column("b")), lambda a, b, c, d: np.array(np.std(b, ddof=0))),
(f.stddev_samp(column("c")), lambda a, b, c, d: np.array(np.std(c, ddof=1))),
(f.var(column("a")), lambda a, b, c, d: np.array(np.var(a, ddof=1))),
(f.var_pop(column("b")), lambda a, b, c, d: np.array(np.var(b, ddof=0))),
(f.var_samp(column("c")), lambda a, b, c, d: np.array(np.var(c, ddof=1))),
])
@pytest.mark.parametrize(
"agg_expr, calc_expected",
[
(f.avg(column("a")), lambda a, b, c, d: np.array(np.average(a))),
(
f.corr(column("a"), column("b")),
lambda a, b, c, d: np.array(np.corrcoef(a, b)[0][1]),
),
(f.count(column("a")), lambda a, b, c, d: pa.array([len(a)])),
# Sample (co)variance -> ddof=1
# Population (co)variance -> ddof=0
(
f.covar(column("a"), column("b")),
lambda a, b, c, d: np.array(np.cov(a, b, ddof=1)[0][1]),
),
(
f.covar_pop(column("a"), column("c")),
lambda a, b, c, d: np.array(np.cov(a, c, ddof=0)[0][1]),
),
(
f.covar_samp(column("b"), column("c")),
lambda a, b, c, d: np.array(np.cov(b, c, ddof=1)[0][1]),
),
# f.grouping(col_a), # No physical plan implemented yet
(f.max(column("a")), lambda a, b, c, d: np.array(np.max(a))),
(f.mean(column("b")), lambda a, b, c, d: np.array(np.mean(b))),
(f.median(column("b")), lambda a, b, c, d: np.array(np.median(b))),
(f.min(column("a")), lambda a, b, c, d: np.array(np.min(a))),
(f.sum(column("b")), lambda a, b, c, d: np.array(np.sum(b.to_pylist()))),
# Sample stdev -> ddof=1
# Population stdev -> ddof=0
(f.stddev(column("a")), lambda a, b, c, d: np.array(np.std(a, ddof=1))),
(f.stddev_pop(column("b")), lambda a, b, c, d: np.array(np.std(b, ddof=0))),
(f.stddev_samp(column("c")), lambda a, b, c, d: np.array(np.std(c, ddof=1))),
(f.var(column("a")), lambda a, b, c, d: np.array(np.var(a, ddof=1))),
(f.var_pop(column("b")), lambda a, b, c, d: np.array(np.var(b, ddof=0))),
(f.var_samp(column("c")), lambda a, b, c, d: np.array(np.var(c, ddof=1))),
],
)
def test_aggregation_stats(df, agg_expr, calc_expected):

agg_df = df.aggregate([], [agg_expr])
result = agg_df.collect()[0]
values_a, values_b, values_c, values_d = df.collect()[0]
expected = calc_expected(values_a, values_b, values_c, values_d)
np.testing.assert_array_almost_equal(result.column(0), expected)


@pytest.mark.parametrize("agg_expr, expected", [
(f.approx_distinct(column("b")), pa.array([2], type=pa.uint64())),
(f.approx_median(column("b")), pa.array([4])),
(f.approx_percentile_cont(column("b"), lit(0.5)), pa.array([4])),
(
f.approx_percentile_cont_with_weight(column("b"), lit(0.6), lit(0.5)),
pa.array([6], type=pa.float64())
),
(f.array_agg(column("b")), pa.array([[4, 4, 6]])),
])
@pytest.mark.parametrize(
"agg_expr, expected",
[
(f.approx_distinct(column("b")), pa.array([2], type=pa.uint64())),
(f.approx_median(column("b")), pa.array([4])),
(f.approx_percentile_cont(column("b"), lit(0.5)), pa.array([4])),
(
f.approx_percentile_cont_with_weight(column("b"), lit(0.6), lit(0.5)),
pa.array([6], type=pa.float64()),
),
(f.array_agg(column("b")), pa.array([[4, 4, 6]])),
],
)
def test_aggregation(df, agg_expr, expected):
agg_df = df.aggregate([], [agg_expr])
result = agg_df.collect()[0]
Expand All @@ -98,20 +116,21 @@ def test_aggregation(df, agg_expr, expected):
def test_aggregate_100(df_aggregate_100):
# https://github.com/apache/datafusion/blob/bddb6415a50746d2803dd908d19c3758952d74f9/datafusion/sqllogictest/test_files/aggregate.slt#L1490-L1498

result = df_aggregate_100.aggregate(
[
column("c1")
],
[
f.approx_percentile_cont(column("c3"), lit(0.95), lit(200)).alias("c3")
]
).sort(column("c1").sort(ascending=True)).collect()
result = (
df_aggregate_100.aggregate(
[column("c1")],
[f.approx_percentile_cont(column("c3"), lit(0.95), lit(200)).alias("c3")],
)
.sort(column("c1").sort(ascending=True))
.collect()
)

assert len(result) == 1
result = result[0]
assert result.column("c1") == pa.array(["a", "b", "c", "d", "e"])
assert result.column("c3") == pa.array([73, 68, 122, 124, 115])


def test_bit_add_or_xor(df):
df = df.aggregate(
[],
Expand Down
90 changes: 50 additions & 40 deletions python/datafusion/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,57 +279,67 @@ def test_distinct():


data_test_window_functions = [
("row", f.window("row_number", [], order_by=[f.order_by(column("c"))]), [2, 1, 3]),
("rank", f.window("rank", [], order_by=[f.order_by(column("c"))]), [2, 1, 2]),
("dense_rank", f.window("dense_rank", [], order_by=[f.order_by(column("c"))]), [2, 1, 2] ),
("percent_rank", f.window("percent_rank", [], order_by=[f.order_by(column("c"))]), [0.5, 0, 0.5]),
("cume_dist", f.window("cume_dist", [], order_by=[f.order_by(column("b"))]), [0.3333333333333333, 0.6666666666666666, 1.0]),
("ntile", f.window("ntile", [literal(2)], order_by=[f.order_by(column("c"))]), [1, 1, 2]),
("next", f.window("lead", [column("b")], order_by=[f.order_by(column("b"))]), [5, 6, None]),
("previous", f.window("lag", [column("b")], order_by=[f.order_by(column("b"))]), [None, 4, 5]),
pytest.param(
"first_value",
f.window(
("row", f.window("row_number", [], order_by=[f.order_by(column("c"))]), [2, 1, 3]),
("rank", f.window("rank", [], order_by=[f.order_by(column("c"))]), [2, 1, 2]),
(
"dense_rank",
f.window("dense_rank", [], order_by=[f.order_by(column("c"))]),
[2, 1, 2],
),
(
"percent_rank",
f.window("percent_rank", [], order_by=[f.order_by(column("c"))]),
[0.5, 0, 0.5],
),
(
"cume_dist",
f.window("cume_dist", [], order_by=[f.order_by(column("b"))]),
[0.3333333333333333, 0.6666666666666666, 1.0],
),
(
"ntile",
f.window("ntile", [literal(2)], order_by=[f.order_by(column("c"))]),
[1, 1, 2],
),
(
"next",
f.window("lead", [column("b")], order_by=[f.order_by(column("b"))]),
[5, 6, None],
),
(
"previous",
f.window("lag", [column("b")], order_by=[f.order_by(column("b"))]),
[None, 4, 5],
),
pytest.param(
"first_value",
[column("a")],
order_by=[f.order_by(column("b"))]
f.window("first_value", [column("a")], order_by=[f.order_by(column("b"))]),
[1, 1, 1],
),
pytest.param(
"last_value",
f.window("last_value", [column("b")], order_by=[f.order_by(column("b"))]),
[4, 5, 6],
),
[1, 1, 1],
),
pytest.param(
"last_value",
f.window("last_value", [column("b")], order_by=[f.order_by(column("b"))]),
[4, 5, 6],
),
pytest.param(
"2nd_value",
f.window(
"nth_value",
[column("b"), literal(2)],
order_by=[f.order_by(column("b"))],
pytest.param(
"2nd_value",
f.window(
"nth_value",
[column("b"), literal(2)],
order_by=[f.order_by(column("b"))],
),
[None, 5, 5],
),
[None, 5, 5],
),
]


@pytest.mark.parametrize("name,expr,result", data_test_window_functions)
def test_window_functions(df, name, expr, result):
df = df.select(
column("a"),
column("b"),
column("c"),
f.alias(expr, name)
)
df = df.select(column("a"), column("b"), column("c"), f.alias(expr, name))

table = pa.Table.from_batches(df.collect())

expected = {
"a": [1, 2, 3],
"b": [4, 5, 6],
"c": [8, 5, 8],
name: result
}
expected = {"a": [1, 2, 3], "b": [4, 5, 6], "c": [8, 5, 8], name: result}

assert table.sort_by("a").to_pydict() == expected

Expand Down
14 changes: 8 additions & 6 deletions python/datafusion/tests/test_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,26 @@ def test_expr_to_variant():
from datafusion import SessionContext
from datafusion.expr import Filter


def traverse_logical_plan(plan):
cur_node = plan.to_variant()
if isinstance(cur_node, Filter):
return cur_node.predicate().to_variant()
if hasattr(plan, 'inputs'):
if hasattr(plan, "inputs"):
for input_plan in plan.inputs():
res = traverse_logical_plan(input_plan)
if res is not None:
return res

ctx = SessionContext()
data = {'id': [1, 2, 3], 'name': ['Alice', 'Bob', 'Charlie']}
ctx.from_pydict(data, name='table1')
data = {"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"]}
ctx.from_pydict(data, name="table1")
query = "SELECT * FROM table1 t1 WHERE t1.name IN ('dfa', 'ad', 'dfre', 'vsa')"
logical_plan = ctx.sql(query).optimized_logical_plan()
variant = traverse_logical_plan(logical_plan)
assert variant is not None
assert variant.expr().to_variant().qualified_name() == 'table1.name'
assert str(variant.list()) == '[Expr(Utf8("dfa")), Expr(Utf8("ad")), Expr(Utf8("dfre")), Expr(Utf8("vsa"))]'
assert variant.expr().to_variant().qualified_name() == "table1.name"
assert (
str(variant.list())
== '[Expr(Utf8("dfa")), Expr(Utf8("ad")), Expr(Utf8("dfre")), Expr(Utf8("vsa"))]'
)
assert not variant.negated()
Loading

0 comments on commit 766e2ed

Please sign in to comment.