From eb59802949291e07cdd903e7b947f37e022ade5f Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Thu, 20 Jun 2024 15:09:26 +0800 Subject: [PATCH] add unittest --- .../tasks/transformation/_column_mapping.py | 37 ++++++++++-- tests/tasks/test_column_mapping.py | 59 +++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-) create mode 100644 tests/tasks/test_column_mapping.py diff --git a/ecoscope_workflows/tasks/transformation/_column_mapping.py b/ecoscope_workflows/tasks/transformation/_column_mapping.py index 18dc0616c..60c70de82 100644 --- a/ecoscope_workflows/tasks/transformation/_column_mapping.py +++ b/ecoscope_workflows/tasks/transformation/_column_mapping.py @@ -9,15 +9,40 @@ @distributed def map_columns( df: DataFrame[JsonSerializableDataFrameModel], - drop_columns: Annotated[list[str], Field()], - retain_columns: Annotated[list[str], Field()], - rename_columns: Annotated[dict[str, str], Field()], + drop_columns: Annotated[list[str], Field(default=[], description="List of columns to drop.")], + retain_columns: Annotated[ + list[str], Field(default=[], description="List of columns to retain. The order of columns will be preserved.") + ], + rename_columns: Annotated[ + dict[str, str], + Field(default={}, description="Dictionary of columns to rename."), + ], ) -> DataFrame[JsonSerializableDataFrameModel]: + """ + Maps and transforms the columns of a DataFrame based on the provided parameters. The order of the operations is as + follows: drop columns, retain/reorder columns, and rename columns. - df.drop(columns=drop_columns, inplace=True) + Args: + df (DataFrame[JsonSerializableDataFrameModel]): The input DataFrame to be transformed. + drop_columns (list[str]): List of columns to drop from the DataFrame. + retain_columns (list[str]): List of columns to retain. The order of columns will be preserved. + rename_columns (dict[str, str]): Dictionary of columns to rename. + + Returns: + DataFrame[JsonSerializableDataFrameModel]: The transformed DataFrame. + + Raises: + KeyError: If any of the columns specified are not found in the DataFrame. + """ + + df = df.drop(columns=drop_columns) if retain_columns: - df.reindex(columns=[retain_columns], inplace=True) + if any(col not in df.columns for col in retain_columns): + raise KeyError(f"Columns {retain_columns} not found in DataFrame.") + df = df.reindex(columns=retain_columns) if rename_columns: - df.rename(columns=rename_columns, inplace=True) + if any(col not in df.columns for col in rename_columns): + raise KeyError(f"Columns {retain_columns} not found in DataFrame.") + df = df.rename(columns=rename_columns) return df diff --git a/tests/tasks/test_column_mapping.py b/tests/tasks/test_column_mapping.py new file mode 100644 index 000000000..b7924cc4b --- /dev/null +++ b/tests/tasks/test_column_mapping.py @@ -0,0 +1,59 @@ +import pandas as pd +import pytest + +from ecoscope_workflows.tasks.transformation import map_columns + + +@pytest.fixture +def sample_dataframe(): + """Fixture to provide a sample DataFrame for testing.""" + data = {"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]} + return pd.DataFrame(data) + + +def test_drop_columns(sample_dataframe): + """Test that columns are correctly dropped.""" + result_df = map_columns(sample_dataframe, drop_columns=["A"], retain_columns=[], rename_columns={}) + assert "A" not in result_df.columns + + +def test_drop_columns_error(sample_dataframe): + """Test raising error if a column does not exist.""" + with pytest.raises(KeyError): + map_columns(sample_dataframe, drop_columns=["NOT_EXIST"], retain_columns=[], rename_columns={}) + + +def test_retain_columns(sample_dataframe): + """Test that only specified columns are retained.""" + result_df = map_columns(sample_dataframe, drop_columns=[], retain_columns=["B"], rename_columns={}) + assert list(result_df.columns) == ["B"] + + +def test_reorder_columns(sample_dataframe): + """Test that only specified columns are retained.""" + result_df = map_columns(sample_dataframe, drop_columns=[], retain_columns=["B", "A"], rename_columns={}) + assert list(result_df.columns) == ["B", "A"] + + +def test_retain_columns_error(sample_dataframe): + """Test raising error if a column does not exist.""" + with pytest.raises(KeyError): + map_columns(sample_dataframe, drop_columns=[], retain_columns=["NOT_EXIST"], rename_columns={}) + + +def test_rename_columns(sample_dataframe): + """Test that columns are correctly renamed.""" + result_df = map_columns(sample_dataframe, drop_columns=[], retain_columns=[], rename_columns={"B": "Z"}) + assert "Z" in result_df.columns and "B" not in result_df.columns + + +def test_rename_columns_error(sample_dataframe): + """Test raising error if a column does not exist.""" + with pytest.raises(KeyError): + map_columns(sample_dataframe, drop_columns=[], retain_columns=[], rename_columns={"NOT_EXIST": "Z"}) + + +def test_map_columns(sample_dataframe): + """Test that columns are correctly mapped.""" + result_df = map_columns(sample_dataframe, drop_columns=["C"], retain_columns=["B"], rename_columns={"B": "Z"}) + assert list(result_df.columns) == ["Z"]