Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: upgrade Ray to 2.6 and fix security dependabots #2403

Merged
merged 8 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion awswrangler/distributed/ray/_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def map(self, func: Callable[..., MapOutputType], _: Optional["BaseClient"], *ar
return list(func(*arg) for arg in zip(itertools.repeat(None), *args))


@ray.remote # type: ignore[attr-defined]
@ray.remote
class AsyncActor:
async def run_concurrent(self, func: Callable[..., MapOutputType], *args: Any) -> MapOutputType:
return func(*args)
Expand Down
4 changes: 2 additions & 2 deletions awswrangler/distributed/ray/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ def _estimate_avail_cpus(cur_pg: Optional[PlacementGroup]) -> int:
Args:
cur_pg: The current placement group, if any.
"""
cluster_cpus = int(ray.cluster_resources().get("CPU", 1)) # type: ignore[attr-defined]
cluster_gpus = int(ray.cluster_resources().get("GPU", 0)) # type: ignore[attr-defined]
cluster_cpus = int(ray.cluster_resources().get("CPU", 1))
cluster_gpus = int(ray.cluster_resources().get("GPU", 0))

# If we're in a placement group, we shouldn't assume the entire cluster's
# resources are available for us to use. Estimate an upper bound on what's
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def __init__(
import ray
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

self._local_scheduling = NodeAffinitySchedulingStrategy(ray.get_runtime_context().get_node_id(), soft=False) # type: ignore[attr-defined]
self._local_scheduling = NodeAffinitySchedulingStrategy(ray.get_runtime_context().get_node_id(), soft=False)

dataset_kwargs = reader_args.pop("dataset_kwargs", {})
try:
Expand All @@ -225,7 +225,7 @@ def __init__(
# Try to infer dataset schema by passing dummy table through UDF.
dummy_table = schema.empty_table()
try:
inferred_schema = _block_udf(dummy_table).schema # type: ignore[union-attr]
inferred_schema = _block_udf(dummy_table).schema
inferred_schema = inferred_schema.with_metadata(schema.metadata)
except Exception: # pylint: disable=broad-except
_logger.debug(
Expand Down
4 changes: 2 additions & 2 deletions awswrangler/s3/_read_deltalake.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _set_default_storage_options_kwargs(
@_utils.check_optional_dependency(deltalake, "deltalake")
@apply_configs
def read_deltalake(
path: Optional[str] = None,
path: str,
version: Optional[int] = None,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
columns: Optional[List[str]] = None,
Expand All @@ -54,7 +54,7 @@ def read_deltalake(

Parameters
----------
path: Optional[str]
path: str
The path of the DeltaTable.
version: Optional[int]
The version of the DeltaTable.
Expand Down
507 changes: 269 additions & 238 deletions poetry.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ python = ">=3.8, <4.0"
# Required
boto3 = "^1.20.32"
botocore = "^1.23.32"
pandas = ">=1.2.0,!=1.5.0,<3.0.0" # Exclusion per: https://github.com/aws/aws-sdk-pandas/issues/1678
pandas = ">=1.2.0,<3.0.0"
numpy = "^1.18"
pyarrow = ">=7.0.0"
typing-extensions = "^4.4.0"
Expand All @@ -56,11 +56,11 @@ jsonpath-ng = { version = "^1.5.3", optional = true }
# Other
openpyxl = { version = "^3.0.0", optional = true }
progressbar2 = { version = "^4.0.0", optional = true }
deltalake = { version = ">=0.6.4,<0.10.0", optional = true }
deltalake = { version = ">=0.6.4,<0.11.0", optional = true }

# Distributed
modin = { version = "^0.22.2", optional = true }
ray = { version = ">=2.0.0,<2.6.0", extras = ["default", "data"], optional = true }
modin = { version = "^0.23.0", optional = true }
ray = { version = ">=2.0.0,<2.7.0", extras = ["default", "data"], optional = true }

[tool.poetry.extras]
redshift = ["redshift-connector"]
Expand Down
27 changes: 17 additions & 10 deletions tests/unit/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def test_athena_ctas(path, path2, path3, glue_table, glue_table2, glue_database,
assert len(wr.s3.list_objects(path=path3)) == 0


@pytest.mark.modin_index
def test_athena_read_sql_ctas_bucketing(path, path2, glue_table, glue_table2, glue_database, glue_ctas_database):
df = pd.DataFrame({"c0": [0, 1], "c1": ["foo", "bar"]})
wr.s3.to_parquet(
Expand All @@ -155,12 +156,14 @@ def test_athena_read_sql_ctas_bucketing(path, path2, glue_table, glue_table2, gl
bucketing_info=(["c0"], 1),
),
s3_output=path2,
pyarrow_additional_kwargs={"ignore_metadata": True},
)
df_no_ctas = wr.athena.read_sql_query(
sql=f"SELECT * FROM {glue_table}",
ctas_approach=False,
database=glue_database,
s3_output=path2,
pyarrow_additional_kwargs={"ignore_metadata": True},
)
assert df_ctas.equals(df_no_ctas)

Expand Down Expand Up @@ -855,6 +858,7 @@ def test_bucketing_catalog_parquet_table(path, glue_database, glue_table):
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -907,12 +911,12 @@ def test_bucketing_parquet_dataset(path, glue_database, glue_table, bucketing_da
if isinstance(bucketing_data[0], str):
dtype = pd.StringDtype()

first_bucket_df = wr.s3.read_parquet(path=[r["paths"][0]])
first_bucket_df = wr.s3.read_parquet(path=[r["paths"][0]], pyarrow_additional_kwargs={"ignore_metadata": True})
assert len(first_bucket_df) == 2
assert pandas_equals(pd.Series([bucketing_data[0], bucketing_data[2]], dtype=dtype), first_bucket_df["c0"])
assert pandas_equals(pd.Series(["foo", "baz"], dtype=pd.StringDtype()), first_bucket_df["c1"])

second_bucket_df = wr.s3.read_parquet(path=[r["paths"][1]])
second_bucket_df = wr.s3.read_parquet(path=[r["paths"][1]], pyarrow_additional_kwargs={"ignore_metadata": True})
assert len(second_bucket_df) == 1
assert pandas_equals(pd.Series([bucketing_data[1]], dtype=dtype), second_bucket_df["c0"])
assert pandas_equals(pd.Series(["bar"], dtype=pd.StringDtype()), second_bucket_df["c1"])
Expand Down Expand Up @@ -943,6 +947,7 @@ def test_bucketing_catalog_csv_table(path, glue_database, glue_table):
assert table["StorageDescriptor"]["BucketColumns"] == bucket_cols


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2], [False, True, False], ["b", "c", "d"]])
@pytest.mark.parametrize(
"dtype",
Expand Down Expand Up @@ -988,12 +993,12 @@ def test_bucketing_csv_dataset(path, glue_database, glue_table, bucketing_data,
assert r["paths"][0].endswith("bucket-00000.csv")
assert r["paths"][1].endswith("bucket-00001.csv")

first_bucket_df = wr.s3.read_csv(path=[r["paths"][0]], header=None, names=["c0", "c1"])
first_bucket_df = wr.s3.read_csv(path=[r["paths"][0]], header=None, names=["c0", "c1"]).reset_index(drop=True)
assert len(first_bucket_df) == 2
assert pandas_equals(pd.Series([bucketing_data[0], bucketing_data[2]]), first_bucket_df["c0"])
assert pandas_equals(pd.Series(["foo", "baz"]), first_bucket_df["c1"])

second_bucket_df = wr.s3.read_csv(path=[r["paths"][1]], header=None, names=["c0", "c1"])
second_bucket_df = wr.s3.read_csv(path=[r["paths"][1]], header=None, names=["c0", "c1"]).reset_index(drop=True)
assert len(second_bucket_df) == 1
assert pandas_equals(pd.Series([bucketing_data[1]]), second_bucket_df["c0"])
assert pandas_equals(pd.Series(["bar"]), second_bucket_df["c1"])
Expand All @@ -1008,6 +1013,7 @@ def test_bucketing_csv_dataset(path, glue_database, glue_table, bucketing_data,
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())


@pytest.mark.modin_index
@pytest.mark.parametrize("bucketing_data", [[0, 1, 2, 3], [False, True, False, True], ["b", "c", "d", "e"]])
def test_combined_bucketing_partitioning_parquet_dataset(path, glue_database, glue_table, bucketing_data):
nb_of_buckets = 2
Expand Down Expand Up @@ -1045,22 +1051,22 @@ def test_combined_bucketing_partitioning_parquet_dataset(path, glue_database, gl
if isinstance(bucketing_data[0], str):
dtype = pd.StringDtype()

bucket_df = wr.s3.read_parquet(path=[r["paths"][0]])
bucket_df = wr.s3.read_parquet(path=[r["paths"][0]], pyarrow_additional_kwargs={"ignore_metadata": True})
assert len(bucket_df) == 1
assert pandas_equals(pd.Series([bucketing_data[0]], dtype=dtype), bucket_df["c0"])
assert pandas_equals(pd.Series(["foo"], dtype=pd.StringDtype()), bucket_df["c1"])

bucket_df = wr.s3.read_parquet(path=[r["paths"][1]])
bucket_df = wr.s3.read_parquet(path=[r["paths"][1]], pyarrow_additional_kwargs={"ignore_metadata": True})
assert len(bucket_df) == 1
assert pandas_equals(pd.Series([bucketing_data[1]], dtype=dtype), bucket_df["c0"])
assert pandas_equals(pd.Series(["bar"], dtype=pd.StringDtype()), bucket_df["c1"])

bucket_df = wr.s3.read_parquet(path=[r["paths"][2]])
bucket_df = wr.s3.read_parquet(path=[r["paths"][2]], pyarrow_additional_kwargs={"ignore_metadata": True})
assert len(bucket_df) == 1
assert pandas_equals(pd.Series([bucketing_data[2]], dtype=dtype), bucket_df["c0"])
assert pandas_equals(pd.Series(["baz"], dtype=pd.StringDtype()), bucket_df["c1"])

bucket_df = wr.s3.read_parquet(path=[r["paths"][3]])
bucket_df = wr.s3.read_parquet(path=[r["paths"][3]], pyarrow_additional_kwargs={"ignore_metadata": True})
assert len(bucket_df) == 1
assert pandas_equals(pd.Series([bucketing_data[3]], dtype=dtype), bucket_df["c0"])
assert pandas_equals(pd.Series(["boo"], dtype=pd.StringDtype()), bucket_df["c1"])
Expand Down Expand Up @@ -1135,6 +1141,7 @@ def test_combined_bucketing_partitioning_csv_dataset(path, glue_database, glue_t
assert all(x in bucketing_data for x in loaded_df["c0"].to_list())


@pytest.mark.modin_index
def test_multiple_bucketing_columns_parquet_dataset(path, glue_database, glue_table):
nb_of_buckets = 2
df = pd.DataFrame({"c0": [0, 1, 2, 3], "c1": [4, 6, 5, 7], "c2": ["foo", "bar", "baz", "boo"]})
Expand All @@ -1152,13 +1159,13 @@ def test_multiple_bucketing_columns_parquet_dataset(path, glue_database, glue_ta
assert r["paths"][0].endswith("bucket-00000.snappy.parquet")
assert r["paths"][1].endswith("bucket-00001.snappy.parquet")

first_bucket_df = wr.s3.read_parquet(path=[r["paths"][0]])
first_bucket_df = wr.s3.read_parquet(path=[r["paths"][0]], pyarrow_additional_kwargs={"ignore_metadata": True})
assert len(first_bucket_df) == 2
assert pandas_equals(pd.Series([0, 3], dtype=pd.Int64Dtype()), first_bucket_df["c0"])
assert pandas_equals(pd.Series([4, 7], dtype=pd.Int64Dtype()), first_bucket_df["c1"])
assert pandas_equals(pd.Series(["foo", "boo"], dtype=pd.StringDtype()), first_bucket_df["c2"])

second_bucket_df = wr.s3.read_parquet(path=[r["paths"][1]])
second_bucket_df = wr.s3.read_parquet(path=[r["paths"][1]], pyarrow_additional_kwargs={"ignore_metadata": True})
assert len(second_bucket_df) == 2
assert pandas_equals(pd.Series([1, 2], dtype=pd.Int64Dtype()), second_bucket_df["c0"])
assert pandas_equals(pd.Series([6, 5], dtype=pd.Int64Dtype()), second_bucket_df["c1"])
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_athena_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ def test_athena_csv_types(path, glue_database, glue_table):
ensure_data_types_csv(df2)


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [True, False])
@pytest.mark.parametrize("ctas_approach", [True, False])
@pytest.mark.parametrize("line_count", [1, 2])
Expand All @@ -388,7 +389,7 @@ def test_skip_header(path, glue_database, glue_table, use_threads, ctas_approach
skip_header_line_count=line_count,
)
df2 = wr.athena.read_sql_table(glue_table, glue_database, use_threads=use_threads, ctas_approach=ctas_approach)
assert df.iloc[line_count - 1 :].reset_index(drop=True).equals(df2)
assert df.iloc[line_count - 1 :].reset_index(drop=True).equals(df2.reset_index(drop=True))


@pytest.mark.parametrize("use_threads", [True, False])
Expand Down
10 changes: 8 additions & 2 deletions tests/unit/test_athena_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ def test_schema_evolution_disabled(path, glue_table, glue_database):
assert df2.c0.sum() == 3


@pytest.mark.modin_index
def test_date_cast(path, glue_table, glue_database):
df = pd.DataFrame(
{
Expand Down Expand Up @@ -614,9 +615,14 @@ def test_date_cast(path, glue_table, glue_database):
}
)
wr.s3.to_parquet(df=df, path=path, dataset=True, database=glue_database, table=glue_table, dtype={"c0": "date"})
df2 = wr.s3.read_parquet(path=path)
df2 = wr.s3.read_parquet(path=path, pyarrow_additional_kwargs={"ignore_metadata": True})
assert pandas_equals(df_expected, df2)
df3 = wr.athena.read_sql_table(database=glue_database, table=glue_table, ctas_approach=False)
df3 = wr.athena.read_sql_table(
database=glue_database,
table=glue_table,
ctas_approach=False,
pyarrow_additional_kwargs={"ignore_metadata": True},
)
assert pandas_equals(df_expected, df3)


Expand Down
20 changes: 16 additions & 4 deletions tests/unit/test_s3_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@ def test_range_index_recovery_simple(path, use_threads):
assert_pandas_equals(df.reset_index(level=0), df2.reset_index(level=0))


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
condition=is_ray_modin,
)
@pytest.mark.parametrize("use_threads", [True, False, 2])
@pytest.mark.parametrize("name", [None, "foo"])
def test_range_index_recovery_pandas(path, use_threads, name):
Expand Down Expand Up @@ -478,8 +484,9 @@ def test_multi_index_recovery_nameless(path, use_threads):
assert_pandas_equals(df.reset_index(), df2.reset_index())


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=wr.exceptions.InvalidArgumentCombination,
raises=(wr.exceptions.InvalidArgumentCombination, AssertionError),
reason="Named index not working when partitioning to a single file",
condition=is_ray_modin,
)
Expand Down Expand Up @@ -544,23 +551,27 @@ def test_to_parquet_dataset_sanitize(path):
assert df2.par.to_list() == ["a", "b"]


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [False, True, 2])
def test_timezone_file(path, use_threads):
file_path = f"{path}0.parquet"
df = pd.DataFrame({"c0": [datetime.utcnow(), datetime.utcnow()]})
df["c0"] = pd.DatetimeIndex(df.c0).tz_localize(tz="US/Eastern")
df.to_parquet(file_path)
df2 = wr.s3.read_parquet(path, use_threads=use_threads)
df2 = wr.s3.read_parquet(path, use_threads=use_threads, pyarrow_additional_kwargs={"ignore_metadata": True})
assert_pandas_equals(df, df2)


@pytest.mark.modin_index
@pytest.mark.parametrize("use_threads", [True, False, 2])
def test_timezone_file_columns(path, use_threads):
file_path = f"{path}0.parquet"
df = pd.DataFrame({"c0": [datetime.utcnow(), datetime.utcnow()], "c1": [1.1, 2.2]})
df["c0"] = pd.DatetimeIndex(df.c0).tz_localize(tz="US/Eastern")
df.to_parquet(file_path)
df2 = wr.s3.read_parquet(path, columns=["c1"], use_threads=use_threads)
df2 = wr.s3.read_parquet(
path, columns=["c1"], use_threads=use_threads, pyarrow_additional_kwargs={"ignore_metadata": True}
)
assert_pandas_equals(df[["c1"]], df2)


Expand Down Expand Up @@ -620,12 +631,13 @@ def test_mixed_types_column(path) -> None:
wr.s3.to_parquet(df, path, dataset=True, partition_cols=["par"])


@pytest.mark.modin_index
@pytest.mark.parametrize("compression", [None, "snappy", "gzip", "zstd"])
def test_parquet_compression(path, compression) -> None:
df = pd.DataFrame({"id": [1, 2, 3]}, dtype="Int64")
path_file = f"{path}0.parquet"
wr.s3.to_parquet(df=df, path=path_file, compression=compression)
df2 = wr.s3.read_parquet([path_file])
df2 = wr.s3.read_parquet([path_file], pyarrow_additional_kwargs={"ignore_metadata": True})
assert_pandas_equals(df, df2)


Expand Down
5 changes: 5 additions & 0 deletions tests/unit/test_s3_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ def test_full_table(path, use_threads):
assert df.shape == df4.shape


@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37928",
condition=is_ray_modin,
)
@pytest.mark.parametrize("use_threads", [True, False, 2])
def test_push_down(path, use_threads):
df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", "boo", "bar"], "c2": [4.0, 5.0, 6.0]})
Expand Down
9 changes: 8 additions & 1 deletion tests/unit/test_s3_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ def test_csv_dataset_header_modes(path, mode, glue_database, glue_table):
assert df_res.equals(dfs[-1])


@pytest.mark.modin_index
@pytest.mark.xfail(
raises=AssertionError,
reason="https://github.com/ray-project/ray/issues/37771",
condition=is_ray_modin,
)
def test_json(path):
df0 = pd.DataFrame({"id": [1, 2, 3]})
path0 = f"{path}test_json0.json"
Expand Down Expand Up @@ -354,6 +360,7 @@ def test_csv_line_terminator(path, line_terminator):
assert df.equals(df2)


@pytest.mark.modin_index
def test_read_json_versioned(path) -> None:
path_file = f"{path}0.json"
dfs = [
Expand All @@ -368,7 +375,7 @@ def test_read_json_versioned(path) -> None:
version_ids.append(version_id)

for df, version_id in zip(dfs, version_ids):
df_temp = wr.s3.read_json(path_file, version_id=version_id)
df_temp = wr.s3.read_json(path_file, version_id=version_id).reset_index(drop=True)
assert df_temp.equals(df)
assert version_id == wr.s3.describe_objects(path=path_file, version_id=version_id)[path_file]["VersionId"]

Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ passenv =
AWS_SESSION_TOKEN
setenv =
COV_FAIL_UNDER = 74.00
WR_CPU_COUNT = 16
allowlist_externals = poetry
commands_pre =
poetry install --no-root --sync --all-extras
Expand Down
Loading