diff --git a/README.md b/README.md index 71e2944..dcf6265 100644 --- a/README.md +++ b/README.md @@ -36,13 +36,13 @@ pip install dask-deltatable import dask_deltatable as ddt # read delta table -ddt.read_delta_table("delta_path") +ddt.read_deltalake("delta_path") # with specific version -ddt.read_delta_table("delta_path", version=3) +ddt.read_deltalake("delta_path", version=3) # with specific datetime -ddt.read_delta_table("delta_path", datetime="2018-12-19T16:39:57-08:00") +ddt.read_deltalake("delta_path", datetime="2018-12-19T16:39:57-08:00") ``` ### Accessing remote file systems @@ -54,7 +54,7 @@ or config files. For AWS, you may need `~/.aws/credential`; for gcsfs, to configure these. ```python -ddt.read_delta_table("s3://bucket_name/delta_path", version=3) +ddt.read_deltalake("s3://bucket_name/delta_path", version=3) ``` ### Accessing AWS Glue catalog @@ -67,7 +67,7 @@ environment variables, and if those are not available, fall back to Example: ```python -ddt.read_delta_table(catalog="glue", database_name="science", table_name="physics") +ddt.read_deltalake(catalog="glue", database_name="science", table_name="physics") ``` ### Inspecting Delta Table history diff --git a/dask_deltatable/__init__.py b/dask_deltatable/__init__.py index d799f39..ef4c1d8 100644 --- a/dask_deltatable/__init__.py +++ b/dask_deltatable/__init__.py @@ -1,8 +1,15 @@ from __future__ import annotations -__all__ = ["read_delta_history", "read_delta_table", "to_delta_table", "vacuum"] +__all__ = [ + "read_deltalake", + "to_deltalake", + "read_delta_table", + "read_delta_history", + "vacuum", +] from .core import read_delta_history as read_delta_history from .core import read_delta_table as read_delta_table +from .core import read_deltalake as read_deltalake from .core import vacuum as vacuum -from .write import to_delta_table as to_delta_table +from .write import to_deltalake as to_deltalake diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py index 48c241a..24fc2e5 100644 --- a/dask_deltatable/core.py +++ b/dask_deltatable/core.py @@ -2,6 +2,7 @@ import json import os +import warnings from functools import partial from typing import Any from urllib.parse import urlparse @@ -217,7 +218,7 @@ def _read_from_catalog( return df -def read_delta_table( +def read_deltalake( path: str | None = None, catalog: str | None = None, database_name: str | None = None, @@ -292,7 +293,8 @@ def read_delta_table( Examples -------- - >>> df = dd.read_delta_table('s3://bucket/my-delta-table') # doctest: +SKIP + >>> import dask_deltatable as ddt + >>> df = ddt.read_deltalake('s3://bucket/my-delta-table') # doctest: +SKIP """ if catalog is not None: @@ -319,6 +321,17 @@ def read_delta_table( return resultdf +def read_delta_table(*args, **kwargs): + warnings.warn( + "`read_delta_table` was renamed, use `read_deltalake` instead", + category=DeprecationWarning, + ) + return read_deltalake(*args, **kwargs) + + +read_delta_table.__doc__ = read_deltalake.__doc__ + + def read_delta_history( path: str, limit: int | None = None, diff --git a/dask_deltatable/write.py b/dask_deltatable/write.py index 229ad7a..84b45d3 100644 --- a/dask_deltatable/write.py +++ b/dask_deltatable/write.py @@ -33,7 +33,7 @@ from ._schema import pyarrow_to_deltalake, validate_compatible -def to_delta_table( +def to_deltalake( table_or_uri: str | Path | DeltaTable, df: dd.DataFrame, *, diff --git a/tests/test_core.py b/tests/test_core.py index 25a5269..fc46bd5 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -69,7 +69,7 @@ def vacuum_table(tmpdir): def test_read_delta(simple_table): - df = ddt.read_delta_table(simple_table) + df = ddt.read_deltalake(simple_table) assert df.columns.tolist() == ["id", "count", "temperature", "newColumn"] assert df.compute().shape == (200, 4) @@ -77,16 +77,16 @@ def test_read_delta(simple_table): def test_read_delta_with_different_versions(simple_table): print(simple_table) - df = ddt.read_delta_table(simple_table, version=0) + df = ddt.read_deltalake(simple_table, version=0) assert df.compute().shape == (100, 3) - df = ddt.read_delta_table(simple_table, version=1) + df = ddt.read_deltalake(simple_table, version=1) assert df.compute().shape == (200, 4) def test_row_filter(simple_table): # row filter - df = ddt.read_delta_table( + df = ddt.read_deltalake( simple_table, version=0, filter=[("count", ">", 30)], @@ -95,17 +95,17 @@ def test_row_filter(simple_table): def test_different_columns(simple_table): - df = ddt.read_delta_table(simple_table, columns=["count", "temperature"]) + df = ddt.read_deltalake(simple_table, columns=["count", "temperature"]) assert df.columns.tolist() == ["count", "temperature"] def test_different_schema(simple_table): # testing schema evolution - df = ddt.read_delta_table(simple_table, version=0) + df = ddt.read_deltalake(simple_table, version=0) assert df.columns.tolist() == ["id", "count", "temperature"] - df = ddt.read_delta_table(simple_table, version=1) + df = ddt.read_deltalake(simple_table, version=1) assert df.columns.tolist() == ["id", "count", "temperature", "newColumn"] @@ -121,7 +121,7 @@ def test_different_schema(simple_table): ) def test_partition_filter(partition_table, kwargs, shape): """partition filter""" - df = ddt.read_delta_table(partition_table, **kwargs) + df = ddt.read_deltalake(partition_table, **kwargs) filter_expr = pq.filters_to_expression(kwargs["filter"]) dt = DeltaTable(partition_table, version=kwargs.get("version")) expected_partitions = len( @@ -132,38 +132,38 @@ def test_partition_filter(partition_table, kwargs, shape): def test_empty(empty_table1, empty_table2): - df = ddt.read_delta_table(empty_table1, version=4) + df = ddt.read_deltalake(empty_table1, version=4) assert df.compute().shape == (0, 2) - df = ddt.read_delta_table(empty_table1, version=0) + df = ddt.read_deltalake(empty_table1, version=0) assert df.compute().shape == (5, 2) with pytest.raises(RuntimeError): # No Parquet files found - _ = ddt.read_delta_table(empty_table2) + _ = ddt.read_deltalake(empty_table2) def test_checkpoint(checkpoint_table): - df = ddt.read_delta_table(checkpoint_table, checkpoint=0, version=4) + df = ddt.read_deltalake(checkpoint_table, checkpoint=0, version=4) assert df.compute().shape[0] == 25 - df = ddt.read_delta_table(checkpoint_table, checkpoint=10, version=12) + df = ddt.read_deltalake(checkpoint_table, checkpoint=10, version=12) assert df.compute().shape[0] == 65 - df = ddt.read_delta_table(checkpoint_table, checkpoint=20, version=22) + df = ddt.read_deltalake(checkpoint_table, checkpoint=20, version=22) assert df.compute().shape[0] == 115 with pytest.raises(Exception): # Parquet file with the given checkpoint 30 does not exists: # File {checkpoint_path} not found" - _ = ddt.read_delta_table(checkpoint_table, checkpoint=30, version=33) + _ = ddt.read_deltalake(checkpoint_table, checkpoint=30, version=33) def test_out_of_version_error(simple_table): # Cannot time travel Delta table to version 4 , Available versions for given # checkpoint 0 are [0,1] with pytest.raises(Exception): - _ = ddt.read_delta_table(simple_table, version=4) + _ = ddt.read_deltalake(simple_table, version=4) def test_load_with_datetime(simple_table2): @@ -179,21 +179,21 @@ def test_load_with_datetime(simple_table2): file_path = os.path.join(log_dir, file_name) os.utime(file_path, (dt_epoch, dt_epoch)) - expected = ddt.read_delta_table(simple_table2, version=0).compute() - result = ddt.read_delta_table( + expected = ddt.read_deltalake(simple_table2, version=0).compute() + result = ddt.read_deltalake( simple_table2, datetime="2020-05-01T00:47:31-07:00" ).compute() assert expected.equals(result) # assert_frame_equal(expected,result) - expected = ddt.read_delta_table(simple_table2, version=1).compute() - result = ddt.read_delta_table( + expected = ddt.read_deltalake(simple_table2, version=1).compute() + result = ddt.read_deltalake( simple_table2, datetime="2020-05-02T22:47:31-07:00" ).compute() assert expected.equals(result) - expected = ddt.read_delta_table(simple_table2, version=4).compute() - result = ddt.read_delta_table( + expected = ddt.read_deltalake(simple_table2, version=4).compute() + result = ddt.read_deltalake( simple_table2, datetime="2020-05-25T22:47:31-07:00" ).compute() assert expected.equals(result) @@ -243,13 +243,13 @@ def test_vacuum(vacuum_table): def test_read_delta_with_error(): with pytest.raises(ValueError) as exc_info: - ddt.read_delta_table() + ddt.read_deltalake() assert str(exc_info.value) == "Please Provide Delta Table path" def test_catalog_with_error(): with pytest.raises(ValueError) as exc_info: - ddt.read_delta_table(catalog="glue") + ddt.read_deltalake(catalog="glue") assert ( str(exc_info.value) == "Since Catalog was provided, please provide Database and table name" @@ -267,7 +267,7 @@ def delta_mock(**kwargs): with patch("deltalake.DeltaTable.from_data_catalog", side_effect=delta_mock): os.environ["AWS_ACCESS_KEY_ID"] = "apple" os.environ["AWS_SECRET_ACCESS_KEY"] = "greatsecret" - df = ddt.read_delta_table( + df = ddt.read_deltalake( catalog="glue", database_name="stores", table_name="orders" ) assert df.compute().shape == (200, 3) diff --git a/tests/test_write.py b/tests/test_write.py index 4e3150d..8ba7ee6 100644 --- a/tests/test_write.py +++ b/tests/test_write.py @@ -8,8 +8,8 @@ from dask.dataframe.utils import assert_eq from dask.datasets import timeseries -from dask_deltatable import read_delta_table -from dask_deltatable.write import to_delta_table +from dask_deltatable import read_deltalake +from dask_deltatable.write import to_deltalake @pytest.mark.parametrize( @@ -48,12 +48,12 @@ def test_roundtrip(tmpdir, with_index, freq, partition_freq): if with_index: ddf = ddf.set_index("timestamp") - out = to_delta_table(tmpdir, ddf, compute=False) + out = to_deltalake(tmpdir, ddf) assert not os.listdir(tmpdir) out.compute() assert len(os.listdir(tmpdir)) > 0 - ddf_read = read_delta_table(tmpdir) + ddf_read = read_deltalake(tmpdir) # FIXME: The index is not recovered if with_index: ddf = ddf.reset_index() @@ -74,6 +74,6 @@ def test_datetime(tmpdir, unit): ddf = dd.from_pandas(df, npartitions=2) out = to_deltalake(tmpdir, ddf) out.compute() - ddf_read = read_delta_table(tmpdir) + ddf_read = read_deltalake(tmpdir) # arrow reads back with ns assert ddf_read.ts.dtype == "datetime64[ns]"