Skip to content

Commit

Permalink
[CHORE] refactors for ruff [1/n] (#2120)
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 authored Apr 15, 2024
1 parent b4a6680 commit e2c6fec
Show file tree
Hide file tree
Showing 50 changed files with 385 additions and 393 deletions.
2 changes: 1 addition & 1 deletion benchmarking/tpch/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def run_all_benchmarks(
logger.info(f"Writing CSV to: {csv_output_location}")
metrics_builder.dump_csv(csv_output_location)
else:
logger.info(f"No CSV location specified, skipping CSV write")
logger.info("No CSV location specified, skipping CSV write")


def generate_parquet_data(tpch_gen_folder: str, scale_factor: float, num_parts: int) -> str:
Expand Down
6 changes: 3 additions & 3 deletions benchmarking/tpch/pipelined_data_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@


def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx : min(ndx + n, l)]
length = len(iterable)
for ndx in range(0, length, n):
yield iterable[ndx : min(ndx + n, length)]


def gen_csv(part_idx: int, cachedir: str, scale_factor: float, num_parts: int):
Expand Down
4 changes: 2 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ def __init__(self, builder: LogicalPlanBuilder) -> None:
if not isinstance(builder, LogicalPlanBuilder):
if isinstance(builder, dict):
raise ValueError(
f"DataFrames should be constructed with a dictionary of columns using `daft.from_pydict`"
"DataFrames should be constructed with a dictionary of columns using `daft.from_pydict`"
)
if isinstance(builder, list):
raise ValueError(
f"DataFrames should be constructed with a list of dictionaries using `daft.from_pylist`"
"DataFrames should be constructed with a list of dictionaries using `daft.from_pylist`"
)
raise ValueError(f"Expected DataFrame to be constructed with a LogicalPlanBuilder, received: {builder}")

Expand Down
2 changes: 1 addition & 1 deletion daft/dataframe/to_torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
MAP_DATASET_CLASS = torch.utils.data.Dataset
ITER_DATASET_CLASS = torch.utils.data.IterableDataset
except ImportError:
logger.error(f"Error when importing Torch. To use PyTorch features, please install torch.")
logger.error("Error when importing Torch. To use PyTorch features, please install torch.")
raise


Expand Down
4 changes: 2 additions & 2 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ def download(
elif on_error == "null":
raise_on_error = False
else:
raise NotImplemented(f"Unimplemented on_error option: {on_error}.")
raise NotImplementedError(f"Unimplemented on_error option: {on_error}.")

if not (isinstance(max_connections, int) and max_connections > 0):
raise ValueError(f"Invalid value for `max_connections`: {max_connections}")
Expand Down Expand Up @@ -1375,7 +1375,7 @@ def decode(self, on_error: Literal["raise"] | Literal["null"] = "raise") -> Expr
elif on_error == "null":
raise_on_error = False
else:
raise NotImplemented(f"Unimplemented on_error option: {on_error}.")
raise NotImplementedError(f"Unimplemented on_error option: {on_error}.")

return Expression._from_pyexpr(self._expr.image_decode(raise_error_on_failure=raise_on_error))

Expand Down
2 changes: 1 addition & 1 deletion daft/iceberg/iceberg_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ def to_scan_tasks(self, pushdowns: Pushdowns) -> Iterator[ScanTask]:
raise NotImplementedError(f"{file_format} for iceberg not implemented!")

if len(task.delete_files) > 0:
raise NotImplementedError(f"Iceberg Merge-on-Read currently not supported, please make an issue!")
raise NotImplementedError("Iceberg Merge-on-Read currently not supported, please make an issue!")

# TODO: Thread in Statistics to each ScanTask: P2
pspec = self._iceberg_record_to_partition_spec(file.partition)
Expand Down
2 changes: 1 addition & 1 deletion daft/io/_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def read_csv(
"blocker for your workflow!"
)
if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of CSV filepaths")
raise ValueError("Cannot read DataFrame from from empty list of CSV filepaths")

io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand Down
2 changes: 1 addition & 1 deletion daft/io/_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def read_json(
DataFrame: parsed DataFrame
"""
if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of JSON filepaths")
raise ValueError("Cannot read DataFrame from from empty list of JSON filepaths")

io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

Expand Down
2 changes: 1 addition & 1 deletion daft/io/_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def read_parquet(
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config

if isinstance(path, list) and len(path) == 0:
raise ValueError(f"Cannot read DataFrame from from empty list of Parquet filepaths")
raise ValueError("Cannot read DataFrame from from empty list of Parquet filepaths")

# If running on Ray, we want to limit the amount of concurrency and requests being made.
# This is because each Ray worker process receives its own pool of thread workers and connections
Expand Down
2 changes: 1 addition & 1 deletion daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def _physical_plan_to_partitions(
# Await at least one task and process the results.
assert (
len(future_to_task) > 0
), f"Scheduler deadlocked! This should never happen. Please file an issue."
), "Scheduler deadlocked! This should never happen. Please file an issue."
done_set, _ = futures.wait(list(future_to_task.keys()), return_when=futures.FIRST_COMPLETED)
for done_future in done_set:
done_id = future_to_task.pop(done_future)
Expand Down
4 changes: 2 additions & 2 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import ray
except ImportError:
logger.error(
f"Error when importing Ray. Please ensure that getdaft was installed with the Ray extras tag: getdaft[ray] (https://www.getdaft.io/projects/docs/en/latest/learn/install.html)"
"Error when importing Ray. Please ensure that getdaft was installed with the Ray extras tag: getdaft[ray] (https://www.getdaft.io/projects/docs/en/latest/learn/install.html)"
)
raise

Expand Down Expand Up @@ -689,7 +689,7 @@ def __init__(
) -> None:
super().__init__()
if ray.is_initialized():
logger.warning(f"Ray has already been initialized, Daft will reuse the existing Ray context.")
logger.warning("Ray has already been initialized, Daft will reuse the existing Ray context.")
self.ray_context = ray.init(address=address, ignore_reinit_error=True)

if isinstance(self.ray_context, ray.client_builder.ClientContext):
Expand Down
2 changes: 1 addition & 1 deletion daft/udf_library/url_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def _download(path: str | None, on_error: Literal["raise"] | Literal["null"]) ->
logger.error(f"Encountered error during download from URL {path} and falling back to Null\n\n{e}: {str(e)}")
return None
else:
raise NotImplemented(f"Unimplemented on_error option: {on_error}.\n\nEncountered error: {e}")
raise NotImplementedError(f"Unimplemented on_error option: {on_error}.\n\nEncountered error: {e}")


def _warmup_fsspec_registry(urls_pylist: list[str | None]) -> None:
Expand Down
8 changes: 6 additions & 2 deletions daft/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ def map_operator_arrow_semantics_bool(
right_pylist: list,
) -> list[bool | None]:
return [
bool(operator(l, r)) if (l is not None and r is not None) else None for (l, r) in zip(left_pylist, right_pylist)
bool(operator(left, right)) if (left is not None and right is not None) else None
for (left, right) in zip(left_pylist, right_pylist)
]


Expand All @@ -103,7 +104,10 @@ def map_operator_arrow_semantics(
left_pylist: list,
right_pylist: list,
) -> list:
return [operator(l, r) if (l is not None and r is not None) else None for (l, r) in zip(left_pylist, right_pylist)]
return [
operator(left, right) if (left is not None and right is not None) else None
for (left, right) in zip(left_pylist, right_pylist)
]


def pyarrow_supports_fixed_shape_tensor() -> bool:
Expand Down
11 changes: 11 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@
import daft
from daft.table import MicroPartition

# import all conftest
from tests.integration.io.conftest import * # noqa: F403


def pytest_addoption(parser):
parser.addoption(
"--credentials",
action="store_true",
help="Whether or not the current environment has access to remote storage credentials",
)


@pytest.fixture(scope="session", autouse=True)
def set_execution_configs():
Expand Down
2 changes: 1 addition & 1 deletion tests/cookbook/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def daft_df(request, tmp_path):
papq.write_table(pacsv.read_csv(COOKBOOK_DATA_CSV), str(tmp_file))
df = daft.read_parquet(str(tmp_file))
else:
assert False, f"Can only handle CSV/Parquet formats"
assert False, "Can only handle CSV/Parquet formats"
return df.select(*[col(c) for c in COLUMNS])


Expand Down
2 changes: 1 addition & 1 deletion tests/cookbook/test_dataloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def test_glob_files(tmpdir):


def test_glob_files_single_file(tmpdir):
filepath = pathlib.Path(tmpdir) / f"file.foo"
filepath = pathlib.Path(tmpdir) / "file.foo"
filepath.write_text("b" * 10)
daft_df = daft.from_glob_path(os.path.join(tmpdir, "file.foo"))
daft_pd_df = daft_df.to_pandas()
Expand Down
2 changes: 1 addition & 1 deletion tests/cookbook/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_image_resize_mixed_modes():
resized_i = as_py[4]
assert np.all(resized_i == 10)

assert as_py[-1] == None
assert as_py[-1] is None


def test_image_decode() -> None:
Expand Down
2 changes: 1 addition & 1 deletion tests/cookbook/test_literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_pyobj_literal_column(daft_df, service_requests_csv_pd_df):
assert_df_equals(daft_pd_df, service_requests_csv_pd_df)


def test_literal_column_computation(daft_df, service_requests_csv_pd_df):
def test_literal_column_computation_apply(daft_df, service_requests_csv_pd_df):
"""Creating a new column that is derived from (1 + other_column) and retrieving the top N results"""
daft_df = daft_df.with_column(
"literal_col", lit({"foo": "bar"}).apply(lambda d: d["foo"], return_dtype=DataType.string())
Expand Down
4 changes: 2 additions & 2 deletions tests/cookbook/test_pandas_cookbook.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def test_select_rows_closest_to_certain_value_using_argsort(repartition_nparts):
def test_splitting_by_row_index(repartition_nparts):
daft_df = daft.from_pydict(SELECTION_DATA).repartition(repartition_nparts)
pd_df = pd.DataFrame.from_dict(SELECTION_DATA)
daft_df = daft_df.where((col("AAA") <= 6) & F.row_number().is_in([0, 2, 4]))
daft_df = daft_df.where((col("AAA") <= 6) & F.row_number().is_in([0, 2, 4])) # noqa: F821
pd_df = pd_df[(pd_df.AAA <= 6) & (pd_df.index.isin([0, 2, 4]))]
daft_pd_df = daft_df.to_pandas()
assert_df_equals(daft_pd_df, pd_df, sort_key="AAA")
Expand All @@ -132,7 +132,7 @@ def test_splitting_by_row_index(repartition_nparts):
def test_splitting_by_row_range(repartition_nparts):
daft_df = daft.from_pydict(SELECTION_DATA).repartition(repartition_nparts)
pd_df = pd.DataFrame.from_dict(SELECTION_DATA)
daft_df = daft_df.where((F.row_number() >= 0) & (F.row_number() < 3))
daft_df = daft_df.where((F.row_number() >= 0) & (F.row_number() < 3)) # noqa: F821
pd_df = pd_df[0:3]
daft_pd_df = daft_df.to_pandas()
assert_df_equals(daft_pd_df, pd_df, sort_key="AAA")
Expand Down
2 changes: 1 addition & 1 deletion tests/dataframe/test_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from daft.dataframe import DataFrame
from daft.datatype import DataType
from daft.utils import pyarrow_supports_fixed_shape_tensor
from tests.conftest import *
from tests.conftest import UuidType

ARROW_VERSION = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric())

Expand Down
3 changes: 0 additions & 3 deletions tests/dataframe/test_repr.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,6 @@ def test_alias_repr(make_df):
["a", "b", "c"],
),
}
expected_data_html = {
**expected_data,
}
assert parse_str_table(df.__repr__()) == expected_data
assert (
df._repr_html_()
Expand Down
8 changes: 4 additions & 4 deletions tests/dataframe/test_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def test_single_float_col_sort(make_df, desc: bool, n_partitions: int):
df = df.sort("A", desc=desc)
sorted_data = df.to_pydict()

def _replace_nan_with_string(l):
return ["nan" if isinstance(item, float) and math.isnan(item) else item for item in l]
def _replace_nan_with_string(items):
return ["nan" if isinstance(item, float) and math.isnan(item) else item for item in items]

expected = [1.0, 2.0, 3.0, float("nan"), None]
if desc:
Expand All @@ -69,8 +69,8 @@ def test_multi_float_col_sort(make_df, n_partitions: int):
df = df.sort(["A", "B"], desc=[True, False])
sorted_data = df.to_pydict()

def _replace_nan_with_string(l):
return ["nan" if isinstance(item, float) and math.isnan(item) else item for item in l]
def _replace_nan_with_string(items):
return ["nan" if isinstance(item, float) and math.isnan(item) else item for item in items]

expected = {
"A": [
Expand Down
5 changes: 4 additions & 1 deletion tests/dataframe/test_temporals.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
PYARROW_GE_7_0_0 = tuple(int(s) for s in pa.__version__.split(".") if s.isnumeric()) >= (7, 0, 0)


# TODO(Colin): these tests were being skipped!
# https://github.com/Eventual-Inc/Daft/issues/2098
@pytest.mark.skip()
def test_temporal_arithmetic() -> None:
now = datetime.now()
now_tz = datetime.now(timezone.utc)
Expand Down Expand Up @@ -151,7 +154,7 @@ def test_python_duration() -> None:
"timezone",
[None, "UTC"],
)
def test_temporal_arithmetic(timeunit, timezone) -> None:
def test_temporal_arithmetic_parameterized(timeunit, timezone) -> None:
pa_table = pa.Table.from_pydict(
{
"timestamp": pa.array([1, 0, -1], pa.timestamp(timeunit, timezone)),
Expand Down
10 changes: 5 additions & 5 deletions tests/expressions/test_expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@
],
)
def test_make_lit(data, expected_dtype) -> None:
l = lit(data)
assert l.name() == "literal"
literal = lit(data)
assert literal.name() == "literal"
empty_table = MicroPartition.empty()
lit_table = empty_table.eval_expression_list([l])
lit_table = empty_table.eval_expression_list([literal])
series = lit_table.get_column("literal")
assert series.datatype() == expected_dtype
repr_out = repr(l)
repr_out = repr(literal)

assert repr_out.startswith("lit(")
assert repr_out.endswith(")")
copied = copy.deepcopy(l)
copied = copy.deepcopy(literal)
assert repr_out == repr(copied)


Expand Down
2 changes: 1 addition & 1 deletion tests/expressions/test_expressions_projection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

def test_expressions_projection_error_dup_name():
with pytest.raises(ValueError):
ep = ExpressionsProjection(
ExpressionsProjection(
[
col("x"),
col("y").alias("x"),
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/iceberg/docker-compose/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@
)

spark.sql(
f"""
"""
INSERT INTO default.test_table_sanitized_character
VALUES
('123')
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/iceberg/test_table_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def test_daft_iceberg_table_collect_correct(table_name, local_iceberg_catalog):

@pytest.mark.integration()
def test_daft_iceberg_table_renamed_filtered_collect_correct(local_iceberg_catalog):
tab = local_iceberg_catalog.load_table(f"default.test_table_rename")
tab = local_iceberg_catalog.load_table("default.test_table_rename")
df = daft.read_iceberg(tab)
df = df.where(df["idx_renamed"] <= 1)
daft_pandas = df.to_pandas()
Expand All @@ -75,7 +75,7 @@ def test_daft_iceberg_table_renamed_filtered_collect_correct(local_iceberg_catal

@pytest.mark.integration()
def test_daft_iceberg_table_renamed_column_pushdown_collect_correct(local_iceberg_catalog):
tab = local_iceberg_catalog.load_table(f"default.test_table_rename")
tab = local_iceberg_catalog.load_table("default.test_table_rename")
df = daft.read_iceberg(tab)
df = df.select("idx_renamed")
daft_pandas = df.to_pandas()
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/io/benchmarks/test_benchmark_glob.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def generate_one_file_per_dir():
NUM_LEVELS = 4
FANOUT_PER_LEVEL = 10
return [
"/".join(f"part_col_{i}={val}" for i, val in enumerate(part_vals)) + f"/0.parquet"
"/".join(f"part_col_{i}={val}" for i, val in enumerate(part_vals)) + "/0.parquet"
for part_vals in itertools.product([str(i) for i in range(FANOUT_PER_LEVEL)], repeat=NUM_LEVELS)
]

Expand Down
8 changes: 0 additions & 8 deletions tests/integration/io/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@
YieldFixture = Generator[T, None, None]


def pytest_addoption(parser):
parser.addoption(
"--credentials",
action="store_true",
help="Whether or not the current environment has access to remote storage credentials",
)


###
# Config fixtures
###
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/io/parquet/test_reads_s3_minio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ def test_minio_parquet_bulk_readback(minio_io_config):
bucket_name = "data-engineering-prod"
with minio_create_bucket(minio_io_config, bucket_name=bucket_name) as fs:
target_paths = [
f"s3://data-engineering-prod/Y/part-00000-51723f93-0ba2-42f1-a58f-154f0ed40f28.c000.snappy.parquet",
f"s3://data-engineering-prod/Z/part-00000-6d5c7cc6-3b4a-443e-a46a-ca9e080bda1b.c000.snappy.parquet",
"s3://data-engineering-prod/Y/part-00000-51723f93-0ba2-42f1-a58f-154f0ed40f28.c000.snappy.parquet",
"s3://data-engineering-prod/Z/part-00000-6d5c7cc6-3b4a-443e-a46a-ca9e080bda1b.c000.snappy.parquet",
]
data = {"x": [1, 2, 3, 4]}
pa_table = pa.Table.from_pydict(data)
Expand Down Expand Up @@ -43,9 +43,9 @@ def test_minio_parquet_ignore_marker_files(minio_io_config):
bucket_name = "data-engineering-prod"
with minio_create_bucket(minio_io_config, bucket_name=bucket_name) as fs:
target_paths = [
f"s3://data-engineering-prod/X/no_ext_parquet_metadata",
f"s3://data-engineering-prod/Y/part-00000-51723f93-0ba2-42f1-a58f-154f0ed40f28.c000.snappy.parquet",
f"s3://data-engineering-prod/Z/part-00000-6d5c7cc6-3b4a-443e-a46a-ca9e080bda1b.c000.snappy.parquet",
"s3://data-engineering-prod/X/no_ext_parquet_metadata",
"s3://data-engineering-prod/Y/part-00000-51723f93-0ba2-42f1-a58f-154f0ed40f28.c000.snappy.parquet",
"s3://data-engineering-prod/Z/part-00000-6d5c7cc6-3b4a-443e-a46a-ca9e080bda1b.c000.snappy.parquet",
]
data = {"x": [1, 2, 3, 4]}
pa_table = pa.Table.from_pydict(data)
Expand Down
Loading

0 comments on commit e2c6fec

Please sign in to comment.