Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create ETL task to map columns #36

Merged
merged 6 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ecoscope_workflows/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from . import io as io
from . import preprocessing as preprocessing
from . import results as results
from . import transformation as transformation
5 changes: 5 additions & 0 deletions ecoscope_workflows/tasks/transformation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from ._column_mapping import map_columns

__all__ = [
"map_columns",
]
67 changes: 67 additions & 0 deletions ecoscope_workflows/tasks/transformation/_column_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logging
from typing import Annotated

from pydantic import Field

from ecoscope_workflows.annotations import DataFrame, JsonSerializableDataFrameModel
from ecoscope_workflows.decorators import distributed

logger = logging.getLogger(__name__)


@distributed
def map_columns(
df: DataFrame[JsonSerializableDataFrameModel],
drop_columns: Annotated[
list[str], Field(default=[], description="List of columns to drop.")
],
retain_columns: Annotated[
list[str],
Field(
default=[],
description="""List of columns to retain with the order specified by the list.
"Keep all the columns if the list is empty.""",
),
],
rename_columns: Annotated[
dict[str, str],
Field(default={}, description="Dictionary of columns to rename."),
],
) -> DataFrame[JsonSerializableDataFrameModel]:
"""
Maps and transforms the columns of a DataFrame based on the provided parameters. The order of the operations is as
follows: drop columns, retain/reorder columns, and rename columns.

Args:
df (DataFrame[JsonSerializableDataFrameModel]): The input DataFrame to be transformed.
drop_columns (list[str]): List of columns to drop from the DataFrame.
retain_columns (list[str]): List of columns to retain. The order of columns will be preserved.
rename_columns (dict[str, str]): Dictionary of columns to rename.

Returns:
DataFrame[JsonSerializableDataFrameModel]: The transformed DataFrame.

Raises:
KeyError: If any of the columns specified are not found in the DataFrame.
"""
if "geometry" in drop_columns:
logger.warning(
"'geometry' found in drop_columns, which may affect spatial operations."
)

if "geometry" in rename_columns.keys():
logger.warning(
"'geometry' found in rename_columns, which may affect spatial operations."
)

df = df.drop(columns=drop_columns)
if retain_columns:
if any(col not in df.columns for col in retain_columns):
raise KeyError(f"Columns {retain_columns} not all found in DataFrame.")
df = df.reindex(columns=retain_columns)
if rename_columns:
if any(col not in df.columns for col in rename_columns):
raise KeyError(f"Columns {rename_columns} not all found in DataFrame.")
df = df.rename(columns=rename_columns)

return df
87 changes: 87 additions & 0 deletions tests/tasks/test_column_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import pandas as pd
import pytest

from ecoscope_workflows.tasks.transformation import map_columns


@pytest.fixture
def sample_dataframe():
"""Fixture to provide a sample DataFrame for testing."""
data = {"A": [1, 2, 3], "B": [4, 5, 6], "C": [7, 8, 9]}
return pd.DataFrame(data)


def test_drop_columns(sample_dataframe):
"""Test that columns are correctly dropped."""
result_df = map_columns(
sample_dataframe, drop_columns=["A"], retain_columns=[], rename_columns={}
)
assert "A" not in result_df.columns


def test_drop_columns_error(sample_dataframe):
"""Test raising error if a column does not exist."""
with pytest.raises(KeyError):
map_columns(
sample_dataframe,
drop_columns=["NOT_EXIST"],
retain_columns=[],
rename_columns={},
)


def test_retain_columns(sample_dataframe):
"""Test that only specified columns are retained."""
result_df = map_columns(
sample_dataframe, drop_columns=[], retain_columns=["B"], rename_columns={}
)
assert list(result_df.columns) == ["B"]


def test_reorder_columns(sample_dataframe):
"""Test that only specified columns are retained."""
result_df = map_columns(
sample_dataframe, drop_columns=[], retain_columns=["B", "A"], rename_columns={}
)
assert list(result_df.columns) == ["B", "A"]


def test_retain_columns_error(sample_dataframe):
"""Test raising error if a column does not exist."""
with pytest.raises(KeyError):
map_columns(
sample_dataframe,
drop_columns=[],
retain_columns=["NOT_EXIST"],
rename_columns={},
)


def test_rename_columns(sample_dataframe):
"""Test that columns are correctly renamed."""
result_df = map_columns(
sample_dataframe, drop_columns=[], retain_columns=[], rename_columns={"B": "Z"}
)
assert "Z" in result_df.columns and "B" not in result_df.columns


def test_rename_columns_error(sample_dataframe):
"""Test raising error if a column does not exist."""
with pytest.raises(KeyError):
map_columns(
sample_dataframe,
drop_columns=[],
retain_columns=[],
rename_columns={"NOT_EXIST": "Z"},
)


def test_map_columns(sample_dataframe):
"""Test that columns are correctly mapped."""
result_df = map_columns(
sample_dataframe,
drop_columns=["C"],
retain_columns=["B"],
rename_columns={"B": "Z"},
)
assert list(result_df.columns) == ["Z"]