Skip to content

Commit

Permalink
Rename with* to *_ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Aug 23, 2024
1 parent 3812644 commit 2c42ee0
Show file tree
Hide file tree
Showing 14 changed files with 35 additions and 35 deletions.
6 changes: 3 additions & 3 deletions daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def refresh_logger() -> None:
# Daft top-level imports
###

from daft.context import set_execution_config, set_planning_config, with_execution_config, with_planning_config
from daft.context import set_execution_config, set_planning_config, execution_config_ctx, planning_config_ctx
from daft.convert import (
from_arrow,
from_dask_dataframe,
Expand Down Expand Up @@ -128,8 +128,8 @@ def refresh_logger() -> None:
"Schema",
"set_planning_config",
"set_execution_config",
"with_planning_config",
"with_execution_config",
"planning_config_ctx",
"execution_config_ctx",
"sql",
"sql_expr",
"to_struct",
Expand Down
4 changes: 2 additions & 2 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def set_runner_py(use_thread_pool: bool | None = None) -> DaftContext:


@contextlib.contextmanager
def with_planning_config(**kwargs):
def planning_config_ctx(**kwargs):
"""Context manager that wraps set_planning_config to reset the config to its original setting afternwards"""
original_config = get_context().daft_planning_config
try:
Expand Down Expand Up @@ -282,7 +282,7 @@ def set_planning_config(


@contextlib.contextmanager
def with_execution_config(**kwargs):
def execution_config_ctx(**kwargs):
"""Context manager that wraps set_execution_config to reset the config to its original setting afternwards"""
original_config = get_context().daft_execution_config
try:
Expand Down
10 changes: 5 additions & 5 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import pyarrow as pa

from daft.context import get_context, with_execution_config
from daft.context import execution_config_ctx, get_context
from daft.logical.builder import LogicalPlanBuilder
from daft.plan_scheduler import PhysicalPlanScheduler
from daft.runners.progress_bar import ProgressBar
Expand Down Expand Up @@ -350,7 +350,7 @@ def single_partition_pipeline(
partial_metadatas: list[PartitionMetadata],
*inputs: MicroPartition,
) -> list[list[PartitionMetadata] | MicroPartition]:
with with_execution_config(
with execution_config_ctx(
config=daft_execution_config,
):
return build_partitions(instruction_stack, partial_metadatas, *inputs)
Expand All @@ -363,7 +363,7 @@ def fanout_pipeline(
partial_metadatas: list[PartitionMetadata],
*inputs: MicroPartition,
) -> list[list[PartitionMetadata] | MicroPartition]:
with with_execution_config(config=daft_execution_config):
with execution_config_ctx(config=daft_execution_config):
return build_partitions(instruction_stack, partial_metadatas, *inputs)


Expand All @@ -376,7 +376,7 @@ def reduce_pipeline(
) -> list[list[PartitionMetadata] | MicroPartition]:
import ray

with with_execution_config(config=daft_execution_config):
with execution_config_ctx(config=daft_execution_config):
return build_partitions(instruction_stack, partial_metadatas, *ray.get(inputs))


Expand All @@ -389,7 +389,7 @@ def reduce_and_fanout(
) -> list[list[PartitionMetadata] | MicroPartition]:
import ray

with with_execution_config(config=daft_execution_config):
with execution_config_ctx(config=daft_execution_config):
return build_partitions(instruction_stack, partial_metadatas, *ray.get(inputs))


Expand Down
4 changes: 2 additions & 2 deletions docs/source/api_docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ Configure Daft in various ways during execution.
:toctree: doc_gen/configuration_functions

daft.set_planning_config
daft.with_planning_config
daft.planning_config_ctx
daft.set_execution_config
daft.with_execution_config
daft.execution_config_ctx

I/O Configurations
******************
Expand Down
6 changes: 3 additions & 3 deletions tests/benchmarks/test_local_tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def gen_tpch(request):
csv_files_location = data_generation.gen_csv_files(TPCH_DBGEN_DIR, num_parts, SCALE_FACTOR)

# Disable native executor to generate parquet files, remove once native executor supports writing parquet files
with daft.context.with_execution_config(enable_native_executor=False):
with daft.context.execution_config_ctx(enable_native_executor=False):
parquet_files_location = data_generation.gen_parquet(csv_files_location)

in_memory_tables = {}
Expand Down Expand Up @@ -106,9 +106,9 @@ def test_tpch(tmp_path, check_answer, get_df, benchmark_with_memray, engine, q):

def f():
if engine == "native":
ctx = daft.context.with_execution_config(enable_native_executor=True)
ctx = daft.context.execution_config_ctx(enable_native_executor=True)
elif engine == "python":
ctx = daft.context.with_execution_config(enable_native_executor=False)
ctx = daft.context.execution_config_ctx(enable_native_executor=False)
else:
raise ValueError(f"{engine} unsupported")

Expand Down
4 changes: 2 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def join_strategy(request):
if request.param != "sort_merge_aligned_boundaries":
yield request.param
else:
with daft.with_execution_config(sort_merge_join_sort_with_aligned_boundaries=True):
with daft.execution_config_ctx(sort_merge_join_sort_with_aligned_boundaries=True):
yield "sort_merge"


Expand Down Expand Up @@ -119,7 +119,7 @@ def _make_df(
else:
raise NotImplementedError(f"make_df not implemented for: {variant}")

with daft.with_execution_config(
with daft.execution_config_ctx(
# Disables merging of ScanTasks of Parquet when reading small Parquet files
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
Expand Down
2 changes: 1 addition & 1 deletion tests/cookbook/test_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def test_parquet_write_with_null_values(tmp_path):

@pytest.fixture()
def smaller_parquet_target_filesize():
with daft.with_execution_config(parquet_target_filesize=1024):
with daft.execution_config_ctx(parquet_target_filesize=1024):
yield


Expand Down
2 changes: 1 addition & 1 deletion tests/dataframe/test_aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ def test_groupby_result_partitions_smaller_than_input(shuffle_aggregation_defaul
else:
min_partitions = shuffle_aggregation_default_partitions

with daft.with_execution_config(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions):
with daft.execution_config_ctx(shuffle_aggregation_default_partitions=shuffle_aggregation_default_partitions):
for partition_size in [1, min_partitions, min_partitions + 1]:
df = daft.from_pydict(
{"group": [i for i in range(min_partitions + 1)], "value": [i for i in range(min_partitions + 1)]}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/io/parquet/test_reads_public_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ def set_split_config(request):
max_size = 0 if request.param[0] else 384 * 1024 * 1024
min_size = 0 if request.param[1] else 96 * 1024 * 1024

with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_max_size_bytes=max_size,
scan_tasks_min_size_bytes=min_size,
):
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/sql/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_sql_create_dataframe_ok(test_db, pdf) -> None:
def test_sql_partitioned_read(test_db, num_partitions, pdf) -> None:
row_size_bytes = daft.from_pandas(pdf).schema().estimate_row_size_bytes()
num_rows_per_partition = len(pdf) / num_partitions
with daft.with_execution_config(
with daft.execution_config_ctx(
read_sql_partition_size_bytes=math.ceil(row_size_bytes * num_rows_per_partition),
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
Expand All @@ -51,7 +51,7 @@ def test_sql_partitioned_read(test_db, num_partitions, pdf) -> None:
def test_sql_partitioned_read_with_custom_num_partitions_and_partition_col(
test_db, num_partitions, partition_col, pdf
) -> None:
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
):
Expand All @@ -68,7 +68,7 @@ def test_sql_partitioned_read_with_custom_num_partitions_and_partition_col(
@pytest.mark.integration()
@pytest.mark.parametrize("num_partitions", [1, 2, 3, 4])
def test_sql_partitioned_read_with_non_uniformly_distributed_column(test_db, num_partitions, pdf) -> None:
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
):
Expand Down
6 changes: 3 additions & 3 deletions tests/io/delta_lake/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_deltalake_read_row_group_splits(tmp_path, base_table):
deltalake.write_deltalake(path, base_table, min_rows_per_group=1, max_rows_per_group=2)

# Force file splitting
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=1,
scan_tasks_max_size_bytes=100,
):
Expand All @@ -68,7 +68,7 @@ def test_deltalake_read_row_group_splits_with_filter(tmp_path, base_table):
deltalake.write_deltalake(path, base_table, min_rows_per_group=1, max_rows_per_group=2)

# Force file splitting
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=1,
scan_tasks_max_size_bytes=100,
):
Expand All @@ -86,7 +86,7 @@ def test_deltalake_read_row_group_splits_with_limit(tmp_path, base_table):
deltalake.write_deltalake(path, base_table, min_rows_per_group=1, max_rows_per_group=2)

# Force file splitting
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=1,
scan_tasks_max_size_bytes=100,
):
Expand Down
8 changes: 4 additions & 4 deletions tests/io/test_merge_scan_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def csv_files(tmpdir):


def test_merge_scan_task_exceed_max(csv_files):
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
):
Expand All @@ -28,7 +28,7 @@ def test_merge_scan_task_exceed_max(csv_files):


def test_merge_scan_task_below_max(csv_files):
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=11,
scan_tasks_max_size_bytes=12,
):
Expand All @@ -39,7 +39,7 @@ def test_merge_scan_task_below_max(csv_files):


def test_merge_scan_task_above_min(csv_files):
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=9,
scan_tasks_max_size_bytes=20,
):
Expand All @@ -50,7 +50,7 @@ def test_merge_scan_task_above_min(csv_files):


def test_merge_scan_task_below_min(csv_files):
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=17,
scan_tasks_max_size_bytes=20,
):
Expand Down
2 changes: 1 addition & 1 deletion tests/io/test_split_scan_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def parquet_files(tmpdir):


def test_split_parquet_read(parquet_files):
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=1,
scan_tasks_max_size_bytes=10,
):
Expand Down
8 changes: 4 additions & 4 deletions tests/ray/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner")
def test_active_plan_clean_up_df_show():
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
):
Expand All @@ -21,7 +21,7 @@ def test_active_plan_clean_up_df_show():

@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner")
def test_active_plan_single_iter_partitions():
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
):
Expand All @@ -37,7 +37,7 @@ def test_active_plan_single_iter_partitions():

@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner")
def test_active_plan_multiple_iter_partitions():
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
):
Expand All @@ -62,7 +62,7 @@ def test_active_plan_multiple_iter_partitions():

@pytest.mark.skipif(get_context().runner_config.name != "ray", reason="Needs to run on Ray runner")
def test_active_plan_with_show_and_write_parquet(tmpdir):
with daft.with_execution_config(
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=0,
scan_tasks_max_size_bytes=0,
):
Expand Down

0 comments on commit 2c42ee0

Please sign in to comment.