Skip to content

Commit

Permalink
feat: Add top_value_matches() function
Browse files Browse the repository at this point in the history
  • Loading branch information
roquelopez committed Aug 22, 2024
1 parent 7eaf9b1 commit d49375a
Show file tree
Hide file tree
Showing 4 changed files with 2,313 additions and 96 deletions.
276 changes: 184 additions & 92 deletions bdikit/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
import logging
from enum import Enum
from os.path import join, dirname
from typing import (
Expand Down Expand Up @@ -62,6 +63,7 @@
GDC_DATA_PATH = join(dirname(__file__), "./resource/gdc_table.csv")
DEFAULT_VALUE_MATCHING_METHOD = "tfidf"
DEFAULT_SCHEMA_MATCHING_METHOD = "coma"
logger = logging.getLogger(__name__)


class SchemaMatchers(Enum):
Expand Down Expand Up @@ -308,69 +310,97 @@ def match_values(
ValueError: If the target is neither a DataFrame nor a standard vocabulary name.
ValueError: If the source column is not present in the source dataset.
"""
if isinstance(column_mapping, pd.DataFrame):
if not all(k in column_mapping.columns for k in ["source", "target"]):
raise ValueError(
"The column_mapping DataFrame must contain 'source' and 'target' columns."
)
mapping_df = column_mapping
elif isinstance(column_mapping, tuple):
mapping_df = pd.DataFrame(
[
{
"source": column_mapping[0],
"target": column_mapping[1],
}
]
)
else:
raise ValueError(
"The column_mapping must be a DataFrame or a tuple of two strings "
"containing the 'source' and 'target' columns."
)

column_mapping_list = mapping_df.to_dict(orient="records")

for mapping in column_mapping_list:
source_column = mapping["source"]
if source_column not in source.columns:
raise ValueError(
f"The source column '{source_column}' is not present in the source dataset."
)

if isinstance(target, str) and target == "gdc":
column_names = mapping_df["target"].unique().tolist()
target_domain = get_gdc_data(column_names)
elif isinstance(target, pd.DataFrame):
target_domain = {
column_name: target[column_name].unique().tolist()
for column_name in target.columns
}
else:
raise ValueError(
"The target must be a DataFrame or a standard vocabulary name."
)

if method_args is None:
method_args = {}
value_matcher = ValueMatchers.get_instance(method, **method_args)
matches = _match_values(source, target_domain, column_mapping_list, value_matcher)

result = [
_value_matching_result_to_df(matching_result) for matching_result in matches
]
if "top_n" in method_args and method_args["top_n"] > 1:
logger.warning(
f"Ignoring 'top_n' argument, use the 'top_value_matches()' method to get top-k value matches."
)
method_args["top_n"] = 1

matches = _match_values(source, target, column_mapping, method, method_args)

if isinstance(column_mapping, tuple):
if len(matches) == 0:
return pd.DataFrame(columns=["source", "target", "similarity"])
# If only a single mapping is provided (as a tuple), we return the result
# directly as a DataFrame to make it easier to display it in notebooks.
assert (
len(result) == 1
), f"Expected one result for a single column mapping, but got: {len(result)}"
return result[0]
len(matches) == 1
), f"Expected one result for a single column mapping, but got: {len(matches)}"
return matches[0]
else:
return result
return matches


def top_value_matches(
source: pd.DataFrame,
target: Union[str, pd.DataFrame],
column_mapping: Union[Tuple[str, str], pd.DataFrame],
top_k: int = 5,
method: str = DEFAULT_VALUE_MATCHING_METHOD,
method_args: Optional[Dict[str, Any]] = None,
) -> List[pd.DataFrame]:
"""
Finds top value matches between column values from the source dataset and column
values of the target domain (a pd.DataFrame or a standard dictionary such
as 'gdc') using the method provided in `method`.
Args:
source (pd.DataFrame): The source dataset containing the columns to be
matched.
target (Union[str, pd.DataFrame]): The target domain to match the
values to. It can be either a DataFrame or a standard vocabulary name.
column_mapping (Union[Tuple[str, str], pd.DataFrame]): A tuple or a
DataFrame containing the mappings between source and target columns.
- If a tuple is provided, it should contain two strings where the first
is the source column and the second is the target column.
- If a DataFrame is provided, it should contain 'source' and 'target'
column names where each row specifies a column mapping.
top_k (int, optional): The number of top matches to return. Defaults to 5.
method (str, optional): The name of the method to use for value
matching.
method_args (Dict[str, Any], optional): The additional arguments of the
method for value matching.
Returns:
List[pd.DataFrame]: A list of DataFrame objects containing
the results of value matching between the source and target values.
Raises:
ValueError: If the column_mapping DataFrame does not contain 'source' and
'target' columns.
ValueError: If the target is neither a DataFrame nor a standard vocabulary name.
ValueError: If the source column is not present in the source dataset.
"""
if method_args is None:
method_args = {}

if "top_n" in method_args:
logger.warning(
f"Ignoring 'top_n' argument, using top_k argument instead (top_k={top_k})"
)

method_args["top_n"] = top_k

matches = _match_values(source, target, column_mapping, method, method_args)

match_list = []
for match in matches:
for _, group in match.groupby("source", dropna=False):
match_list.append(
group.reset_index(drop=True).sort_values(
by=["similarity"], ascending=False
)
)

return match_list


def view_value_matches(
Expand Down Expand Up @@ -407,54 +437,29 @@ def view_value_matches(
display(match)


def _value_matching_result_to_df(
matching_result: ValueMatchingResult, default_unmatched: Any = np.nan
) -> pd.DataFrame:
"""
Transforms the list of matches and unmatched values into a DataFrame.
"""
matches_df = pd.DataFrame(
data=matching_result["matches"],
columns=["source", "target", "similarity"],
)

unmatched_values = matching_result["unmatch_values"]
unmatched_df = pd.DataFrame(
data=list(
zip(
unmatched_values,
[default_unmatched] * len(unmatched_values),
[default_unmatched] * len(unmatched_values),
)
),
columns=["source", "target", "similarity"],
)

result = pd.concat([matches_df, unmatched_df], ignore_index=True)
result.attrs["source"] = matching_result["source"]
result.attrs["target"] = matching_result["target"]
result.attrs["coverage"] = matching_result["coverage"]
return result


def _match_values(
dataset: pd.DataFrame,
target_domain: Dict[str, Optional[List[str]]],
column_mapping: List[Dict],
value_matcher: BaseValueMatcher,
) -> List[ValueMatchingResult]:
source: pd.DataFrame,
target: Union[str, pd.DataFrame],
column_mapping: Union[Tuple[str, str], pd.DataFrame],
method: str,
method_args: Dict[str, Any],
) -> List[pd.DataFrame]:

target_domain, column_mapping_list = _format_value_matching_input(
source, target, column_mapping
)
value_matcher = ValueMatchers.get_instance(method, **method_args)
mapping_results: List[ValueMatchingResult] = []

for mapping in column_mapping:
for mapping in column_mapping_list:
source_column, target_column = mapping["source"], mapping["target"]

# 1. Select candidate columns for value mapping
target_domain_list = target_domain[target_column]
if target_domain_list is None or len(target_domain_list) == 0:
continue

unique_values = dataset[source_column].unique()
unique_values = source[source_column].unique()
if _skip_values(unique_values):
continue

Expand All @@ -481,9 +486,9 @@ def _match_values(
)

# 5. Calculate the coverage and unmatched values
coverage = len(matches) / len(source_values_dict)
source_values = set(source_values_dict.values())
match_values = set([x[0] for x in matches])
coverage = len(match_values) / len(source_values_dict)

mapping_results.append(
ValueMatchingResult(
Expand All @@ -496,7 +501,94 @@ def _match_values(
)
)

return mapping_results
mapping_df_list = [
_value_matching_result_to_df(mapping_result)
for mapping_result in mapping_results
]

return mapping_df_list


def _format_value_matching_input(
source: pd.DataFrame,
target: Union[str, pd.DataFrame],
column_mapping: Union[Tuple[str, str], pd.DataFrame],
):
if isinstance(column_mapping, pd.DataFrame):
if not all(k in column_mapping.columns for k in ["source", "target"]):
raise ValueError(
"The column_mapping DataFrame must contain 'source' and 'target' columns."
)
mapping_df = column_mapping
elif isinstance(column_mapping, tuple):
mapping_df = pd.DataFrame(
[
{
"source": column_mapping[0],
"target": column_mapping[1],
}
]
)
else:
raise ValueError(
"The column_mapping must be a DataFrame or a tuple of two strings "
"containing the 'source' and 'target' columns."
)

column_mapping_list = mapping_df.to_dict(orient="records")

for mapping in column_mapping_list:
source_column = mapping["source"]
if source_column not in source.columns:
raise ValueError(
f"The source column '{source_column}' is not present in the source dataset."
)

if isinstance(target, str) and target == "gdc":
column_names = mapping_df["target"].unique().tolist()
target_domain = get_gdc_data(column_names)
elif isinstance(target, pd.DataFrame):
target_domain = {
column_name: target[column_name].unique().tolist()
for column_name in target.columns
}
else:
raise ValueError(
"The target must be a DataFrame or a standard vocabulary name."
)

return target_domain, column_mapping_list


def _value_matching_result_to_df(
matching_result: ValueMatchingResult, default_unmatched: Any = np.nan
) -> pd.DataFrame:
"""
Transforms the list of matches and unmatched values into a DataFrame.
"""
matches_df = pd.DataFrame(
data=matching_result["matches"],
columns=["source", "target", "similarity"],
)

unmatched_values = matching_result["unmatch_values"]

unmatched_df = pd.DataFrame(
data=list(
zip(
unmatched_values,
[default_unmatched] * len(unmatched_values),
[default_unmatched] * len(unmatched_values),
)
),
columns=["source", "target", "similarity"],
)

result = pd.concat([matches_df, unmatched_df], ignore_index=True)
result.attrs["source"] = matching_result["source"]
result.attrs["target"] = matching_result["target"]
result.attrs["coverage"] = matching_result["coverage"]
return result


def _skip_values(unique_values: np.ndarray, max_length: int = 50):
Expand Down Expand Up @@ -628,7 +720,7 @@ def check_duplicates(mappings: List[ColumnMappingSpec]):
source_column = mapping["source"]
target_column = mapping["target"]

# ignore duplicate mappings accross user and value mappings
# ignore duplicate mappings across user and value mappings
key = create_key(source_column, target_column)
if key in mapping_keys:
continue
Expand Down
3 changes: 2 additions & 1 deletion docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ Examples

Here can find different Jupyter notebook examples about how to use `bdi-kit`:

- `Analyzing one attribute/column at a time <https://github.com/VIDA-NYU/bdi-kit/blob/devel/examples/analyzing_one_attribute.ipynb>`__
- `Changing the parameters of the matching methods <https://github.com/VIDA-NYU/bdi-kit/blob/devel/examples/changing_parameters.ipynb>`__
- `Getting the top-k value matches <https://github.com/VIDA-NYU/bdi-kit/blob/devel/examples/top_k_matches.ipynb>`__
- `Analyzing one attribute/column at a time <https://github.com/VIDA-NYU/bdi-kit/blob/devel/examples/analyzing_one_attribute.ipynb>`__
- `Exploring schema and value matching through a visualization tool <https://github.com/VIDA-NYU/bdi-kit/blob/devel/examples/schema_matching_heatmap.ipynb>`__
Loading

0 comments on commit d49375a

Please sign in to comment.