Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHORE] Swordfish specific test fixtures #3164

Merged
merged 7 commits into from
Nov 5, 2024
Merged

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Nov 1, 2024

This PR sets up a few swordfish related test fixtures, specifically:

  • Parameterize default_morsel_size = [1, None] for dataframe tests that do any into/repartitioning. This is to make sure that the operator parallelism is working.
  • Setup iteration tests in test_iter.py
  • Makes the ordering assertions stricter on some tests. E.g. some tests do assert df.sort(col) == expected, but there are other columns in df that may not be sorted, and this won't be enough if morsel_size = 1. This isn't a problem with swordfish but the test, where the sort should actually involve more columns.

Big note: There was a problem with pivot not getting applied correctly. This is because a dataframe pivot operation comprises of an agg + the actual pivoting, but previously the pivot was implemented as an intermediate operator, and the results of the agg were getting buffered. In order for the pivot to work it has to receive all values with the same group_by keys. This PR implements simplifies Pivot as a BlockingSink, so all the work is in there.

@github-actions github-actions bot added the chore label Nov 1, 2024
Copy link

codspeed-hq bot commented Nov 2, 2024

CodSpeed Performance Report

Merging #3164 will not alter performance

Comparing colin/swordfish-testing (1a19329) with main (3cef614)

Summary

✅ 17 untouched benchmarks

Copy link

codecov bot commented Nov 2, 2024

Codecov Report

Attention: Patch coverage is 96.55172% with 2 lines in your changes missing coverage. Please review.

Project coverage is 79.02%. Comparing base (9d4adfb) to head (1a19329).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-local-execution/src/sinks/pivot.rs 95.91% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3164      +/-   ##
==========================================
+ Coverage   79.01%   79.02%   +0.01%     
==========================================
  Files         634      634              
  Lines       76942    76962      +20     
==========================================
+ Hits        60792    60823      +31     
+ Misses      16150    16139      -11     
Files with missing lines Coverage Δ
src/daft-local-execution/src/pipeline.rs 94.83% <100.00%> (ø)
src/daft-physical-plan/src/local_plan.rs 96.12% <100.00%> (+0.03%) ⬆️
src/daft-physical-plan/src/translate.rs 92.66% <100.00%> (-0.75%) ⬇️
src/daft-local-execution/src/sinks/pivot.rs 95.91% <95.91%> (ø)

... and 9 files with indirect coverage changes

@colin-ho colin-ho marked this pull request as ready for review November 2, 2024 01:10
@colin-ho colin-ho requested a review from jaychia November 2, 2024 01:11
@@ -13,6 +14,12 @@
###


@pytest.fixture(scope="function", autouse=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should preferably call the context manager explicitly in our tests, or pass the fixture into tests when we want to use it -- we've had issues in the past where autouse makes it confusing for people debugging our unit test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, making the fixture explicit.

src/daft-local-execution/src/pipeline.rs Outdated Show resolved Hide resolved
src/daft-local-execution/src/sinks/pivot.rs Show resolved Hide resolved
&[aggregation.clone()],
&aggregate_schema.into(),
&group_by_with_pivot,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the two-stage agg only necessary for partitioned distributed computations? To my knowledge, we do this 2-stage thing so that we can perform a local agg (as an optimization to reduce data cardinality) before the shuffle, and then perform the second local agg + final project to correctly execute the operation.

For local computations on swordfish, would it not be more performant to just perform a fully-materializing single stage aggregation before the pivot?

Copy link
Contributor Author

@colin-ho colin-ho Nov 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was actually faster to still use the two-stage agg pattern on swordfish, vs full-materialize then agg (at least for the tpch questions), so i kept it around.

But, the simpler and way more effective way to do it local would be to do a .fold like pattern. This is something I plan on doing.

I probably shouldn't use the two stage for pivot then, will remove it and keep it simple.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified it to be a fully-materializing single stage agg.

@colin-ho colin-ho requested a review from jaychia November 4, 2024 21:03
def with_morsel_size(request):
morsel_size = request.param
with daft.context.execution_config_ctx(default_morsel_size=morsel_size):
yield
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe yield the morsel size

@pytest.fixture(
scope="module",
params=[1, 2] if daft.context.get_context().daft_execution_config.enable_native_executor is False else [1],
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM I think, but in the future is should really just be if daft.context.get_context().runner == "ray" since partitioning only makes sense for the ray runner

@jaychia
Copy link
Contributor

jaychia commented Nov 4, 2024

One last comment -- if we're concerned with verbosity of passing with_morsel_size everywhere, I think I'm ok with adding a scope="module" fixture on each file where we want to use with_morsel_size so that we have an explicit per-file marker of usage of this fixture.

@colin-ho colin-ho merged commit 96c538b into main Nov 5, 2024
42 checks passed
@colin-ho colin-ho deleted the colin/swordfish-testing branch November 5, 2024 00:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants