Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support for top-k value matches #80

Open
wants to merge 4 commits into
base: devel
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 202 additions & 101 deletions bdikit/api.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations
import logging
from enum import Enum
from collections import defaultdict
from os.path import join, dirname
from typing import (
Union,
Expand Down Expand Up @@ -62,6 +64,7 @@
GDC_DATA_PATH = join(dirname(__file__), "./resource/gdc_table.csv")
DEFAULT_VALUE_MATCHING_METHOD = "tfidf"
DEFAULT_SCHEMA_MATCHING_METHOD = "coma"
logger = logging.getLogger(__name__)


class SchemaMatchers(Enum):
Expand Down Expand Up @@ -308,69 +311,97 @@ def match_values(
ValueError: If the target is neither a DataFrame nor a standard vocabulary name.
ValueError: If the source column is not present in the source dataset.
"""
if isinstance(column_mapping, pd.DataFrame):
if not all(k in column_mapping.columns for k in ["source", "target"]):
raise ValueError(
"The column_mapping DataFrame must contain 'source' and 'target' columns."
)
mapping_df = column_mapping
elif isinstance(column_mapping, tuple):
mapping_df = pd.DataFrame(
[
{
"source": column_mapping[0],
"target": column_mapping[1],
}
]
)
else:
raise ValueError(
"The column_mapping must be a DataFrame or a tuple of two strings "
"containing the 'source' and 'target' columns."
)

column_mapping_list = mapping_df.to_dict(orient="records")

for mapping in column_mapping_list:
source_column = mapping["source"]
if source_column not in source.columns:
raise ValueError(
f"The source column '{source_column}' is not present in the source dataset."
)

if isinstance(target, str) and target == "gdc":
column_names = mapping_df["target"].unique().tolist()
target_domain = get_gdc_data(column_names)
elif isinstance(target, pd.DataFrame):
target_domain = {
column_name: target[column_name].unique().tolist()
for column_name in target.columns
}
else:
raise ValueError(
"The target must be a DataFrame or a standard vocabulary name."
)

if method_args is None:
method_args = {}
value_matcher = ValueMatchers.get_instance(method, **method_args)
matches = _match_values(source, target_domain, column_mapping_list, value_matcher)

result = [
_value_matching_result_to_df(matching_result) for matching_result in matches
]
if "top_n" in method_args and method_args["top_n"] > 1:
logger.warning(
f"Ignoring 'top_n' argument, use the 'top_value_matches()' method to get top-k value matches."
)
method_args["top_n"] = 1

matches = _match_values(source, target, column_mapping, method, method_args)

if isinstance(column_mapping, tuple):
if len(matches) == 0:
return pd.DataFrame(columns=["source", "target", "similarity"])
# If only a single mapping is provided (as a tuple), we return the result
# directly as a DataFrame to make it easier to display it in notebooks.
assert (
len(result) == 1
), f"Expected one result for a single column mapping, but got: {len(result)}"
return result[0]
len(matches) == 1
), f"Expected one result for a single column mapping, but got: {len(matches)}"
return matches[0]
else:
return result
return matches


def top_value_matches(
source: pd.DataFrame,
target: Union[str, pd.DataFrame],
column_mapping: Union[Tuple[str, str], pd.DataFrame],
top_k: int = 5,
method: str = DEFAULT_VALUE_MATCHING_METHOD,
method_args: Optional[Dict[str, Any]] = None,
) -> List[pd.DataFrame]:
"""
Finds top value matches between column values from the source dataset and column
values of the target domain (a pd.DataFrame or a standard dictionary such
as 'gdc') using the method provided in `method`.

Args:
source (pd.DataFrame): The source dataset containing the columns to be
matched.

target (Union[str, pd.DataFrame]): The target domain to match the
values to. It can be either a DataFrame or a standard vocabulary name.

column_mapping (Union[Tuple[str, str], pd.DataFrame]): A tuple or a
DataFrame containing the mappings between source and target columns.

- If a tuple is provided, it should contain two strings where the first
is the source column and the second is the target column.
- If a DataFrame is provided, it should contain 'source' and 'target'
column names where each row specifies a column mapping.

top_k (int, optional): The number of top matches to return. Defaults to 5.

method (str, optional): The name of the method to use for value
matching.
method_args (Dict[str, Any], optional): The additional arguments of the
method for value matching.

Returns:
List[pd.DataFrame]: A list of DataFrame objects containing
the results of value matching between the source and target values.

Raises:
ValueError: If the column_mapping DataFrame does not contain 'source' and
'target' columns.
ValueError: If the target is neither a DataFrame nor a standard vocabulary name.
ValueError: If the source column is not present in the source dataset.
"""
if method_args is None:
method_args = {}

if "top_n" in method_args:
logger.warning(
f"Ignoring 'top_n' argument, using top_k argument instead (top_k={top_k})"
)

method_args["top_n"] = top_k

matches = _match_values(source, target, column_mapping, method, method_args)

match_list = []
for match in matches:
for _, group in match.groupby("source", dropna=False):
match_list.append(
group.reset_index(drop=True).sort_values(
by=["similarity"], ascending=False
)
)

return match_list


def view_value_matches(
Expand All @@ -389,72 +420,55 @@ def view_value_matches(
match_list = [matches]
elif isinstance(matches, list):
match_list = matches

else:
raise ValueError("The matches must be a DataFrame or a list of DataFrames")

for match in match_list:
# Grouping DataFrames by metadata (source and target columns)
grouped_matches = defaultdict(list)
for match_df in match_list:
grouped_matches[match_df.attrs["source"], match_df.attrs["target"]].append(
match_df
)

# Display grouped DataFrames
for (source_col, target_col), match_dfs in grouped_matches.items():
display(
Markdown(
f"<br>**Source column:** {match.attrs['source']}<br>"
f"**Target column:** {match.attrs['target']}<br>"
f"<br>**Source column:** {source_col}<br>"
f"**Target column:** {target_col}<br>"
)
)
if edit:
match_widget = pn.widgets.Tabulator(match, disabled=not edit)
display(match_widget)
else:
display(match)


def _value_matching_result_to_df(
matching_result: ValueMatchingResult, default_unmatched: Any = np.nan
) -> pd.DataFrame:
"""
Transforms the list of matches and unmatched values into a DataFrame.
"""
matches_df = pd.DataFrame(
data=matching_result["matches"],
columns=["source", "target", "similarity"],
)

unmatched_values = matching_result["unmatch_values"]
unmatched_df = pd.DataFrame(
data=list(
zip(
unmatched_values,
[default_unmatched] * len(unmatched_values),
[default_unmatched] * len(unmatched_values),
)
),
columns=["source", "target", "similarity"],
)

result = pd.concat([matches_df, unmatched_df], ignore_index=True)
result.attrs["source"] = matching_result["source"]
result.attrs["target"] = matching_result["target"]
result.attrs["coverage"] = matching_result["coverage"]
return result
for match_df in match_dfs:
if edit:
match_widget = pn.widgets.Tabulator(match_df, disabled=not edit)
display(match_widget)
else:
display(match_df)


def _match_values(
dataset: pd.DataFrame,
target_domain: Dict[str, Optional[List[str]]],
column_mapping: List[Dict],
value_matcher: BaseValueMatcher,
) -> List[ValueMatchingResult]:
source: pd.DataFrame,
target: Union[str, pd.DataFrame],
column_mapping: Union[Tuple[str, str], pd.DataFrame],
method: str,
method_args: Dict[str, Any],
) -> List[pd.DataFrame]:

target_domain, column_mapping_list = _format_value_matching_input(
source, target, column_mapping
)
value_matcher = ValueMatchers.get_instance(method, **method_args)
mapping_results: List[ValueMatchingResult] = []

for mapping in column_mapping:
for mapping in column_mapping_list:
source_column, target_column = mapping["source"], mapping["target"]

# 1. Select candidate columns for value mapping
target_domain_list = target_domain[target_column]
if target_domain_list is None or len(target_domain_list) == 0:
continue

unique_values = dataset[source_column].unique()
unique_values = source[source_column].unique()
if _skip_values(unique_values):
continue

Expand All @@ -481,9 +495,9 @@ def _match_values(
)

# 5. Calculate the coverage and unmatched values
coverage = len(matches) / len(source_values_dict)
source_values = set(source_values_dict.values())
match_values = set([x[0] for x in matches])
coverage = len(match_values) / len(source_values_dict)

mapping_results.append(
ValueMatchingResult(
Expand All @@ -496,7 +510,94 @@ def _match_values(
)
)

return mapping_results
mapping_df_list = [
_value_matching_result_to_df(mapping_result)
for mapping_result in mapping_results
]

return mapping_df_list


def _format_value_matching_input(
source: pd.DataFrame,
target: Union[str, pd.DataFrame],
column_mapping: Union[Tuple[str, str], pd.DataFrame],
):
if isinstance(column_mapping, pd.DataFrame):
if not all(k in column_mapping.columns for k in ["source", "target"]):
raise ValueError(
"The column_mapping DataFrame must contain 'source' and 'target' columns."
)
mapping_df = column_mapping
elif isinstance(column_mapping, tuple):
mapping_df = pd.DataFrame(
[
{
"source": column_mapping[0],
"target": column_mapping[1],
}
]
)
else:
raise ValueError(
"The column_mapping must be a DataFrame or a tuple of two strings "
"containing the 'source' and 'target' columns."
)

column_mapping_list = mapping_df.to_dict(orient="records")

for mapping in column_mapping_list:
source_column = mapping["source"]
if source_column not in source.columns:
raise ValueError(
f"The source column '{source_column}' is not present in the source dataset."
)

if isinstance(target, str) and target == "gdc":
column_names = mapping_df["target"].unique().tolist()
target_domain = get_gdc_data(column_names)
elif isinstance(target, pd.DataFrame):
target_domain = {
column_name: target[column_name].unique().tolist()
for column_name in target.columns
}
else:
raise ValueError(
"The target must be a DataFrame or a standard vocabulary name."
)

return target_domain, column_mapping_list


def _value_matching_result_to_df(
matching_result: ValueMatchingResult, default_unmatched: Any = np.nan
) -> pd.DataFrame:
"""
Transforms the list of matches and unmatched values into a DataFrame.
"""
matches_df = pd.DataFrame(
data=matching_result["matches"],
columns=["source", "target", "similarity"],
)

unmatched_values = matching_result["unmatch_values"]

unmatched_df = pd.DataFrame(
data=list(
zip(
unmatched_values,
[default_unmatched] * len(unmatched_values),
[default_unmatched] * len(unmatched_values),
)
),
columns=["source", "target", "similarity"],
)

result = pd.concat([matches_df, unmatched_df], ignore_index=True)
result.attrs["source"] = matching_result["source"]
result.attrs["target"] = matching_result["target"]
result.attrs["coverage"] = matching_result["coverage"]
return result


def _skip_values(unique_values: np.ndarray, max_length: int = 50):
Expand Down Expand Up @@ -628,7 +729,7 @@ def check_duplicates(mappings: List[ColumnMappingSpec]):
source_column = mapping["source"]
target_column = mapping["target"]

# ignore duplicate mappings accross user and value mappings
# ignore duplicate mappings across user and value mappings
key = create_key(source_column, target_column)
if key in mapping_keys:
continue
Expand Down
Loading
Loading